集合类不安全之并发修改异常以及线程池

mac2025-05-04  5

集合类不安全之并发修改异常以及线程池

1、ArrayList线程不安全的例子?

1.故障现象: -----|java.util.ConcurrentModificationExcption >>高并发异常

2.导致原因: 并发争抢修改导致,参考我们的花名册签名情况。 一个人正在写入,另一同学过来抢夺,导致数据不一致异常。并发修改异常

3.解决办法: 3.1 new Vector<>(); 3.2 Collections.synchronizedList(new ArrayList<>()); 3.3 new CopyOnWriteArrayList<>(); 写时复制:copyOnWrite容器即写时复制的容器。向一个容器添加元素时,不直接向当前Object[]添加,而是将当前容器进行copy,得到Object[] newElements,然后向新的容器Object[] newElements里添加元素,再将原容器的引用指向新的容器setArray(newElements);这样做的好处是可以对CopyOnWrite容器进行并发的读。而不需加锁,因为当前容器不会添加任何元素。所以CopyOnWrite容器也是一种读写分离的思想,读和写不同的容器。 4.优化建议:

2、set线程不安全的例子?

​ 解决方法: ​ Collections.synchronizedSet(new HashSet<>()); ​ new CopyOnWriteSet<>(); — 底层源码还是一个CopyOnWriteArrayList

​ HashSet — 底层是HashMap,那为什add(key),参数只有一个? ​ 源码:这个参数是put(key,value)的key,value人家已经定义好了。

3、Map线程不安全的例子?

​ 解决方法: ​ Collections.synchronizedMap(new HashMap<>()); ​ new ConcurrentHashMap<>(); — 底层源码还是一个?

4、公平锁/非公平锁/可重入锁/自旋锁,谈谈理解,手写一个自旋锁?

并发包中ReentrantLock的创建可以指定构造函数的Boolean类型来得到公平锁或非公平锁,默认是非公平锁。 公平锁:多个线程按照申请锁的顺序来获取锁,类似排队打饭,先来后到 非公平锁:多个线程获取锁的顺序并不是按照申请锁的顺序,有可能后申请的同学比先申请的线程优先获取锁,在高并发的情况下,有可能会造成优先级反转或饥饿现象。 可重入锁(递归锁):最大作用----避免死锁 同一线程外层函数获得锁之后,内层递归函数任然能获取该锁的代码,在同一线程在外层方法获取锁的时候,在进入内层方法会自动获取锁。 也就是,线程可以进入任何一个它已经拥有的锁所同步者的代码块。 ReentrantLock/Synchronized就是一个典型的可重入锁 自旋锁(spinlock):指尝试获取锁的线程不会立即阻塞,而是采用循环的方式去尝试获取锁,这样的好处是减少线程上下文切换的消耗,缺点是循环会消耗CPU import java.sql.Time; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; public class TestMyLock { //原子引用线程 AtomicReference<Thread> atomicReference = new AtomicReference<>(); Thread thread = new Thread(); public void myLock(){ System.out.println(Thread.currentThread().getName()+"\t come in ……"); while(!atomicReference.compareAndSet(null,thread)){} } public void myUnLock(){ atomicReference.compareAndSet(thread,null); System.out.println(Thread.currentThread().getName()+"\t invoked myUnLock"); } public static void main(String[] args) { TestMyLock testMyLock = new TestMyLock(); new Thread(() -> { testMyLock.myLock(); //睡眠一会 try{ TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e){ e.printStackTrace(); } testMyLock.myUnLock(); },"AA").start(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(() -> { testMyLock.myLock(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } testMyLock.myUnLock(); },"BB").start(); } }

独占锁:指该锁一次只能被一个线程所持有,对ReentrantLock和Synchronized而言都是独占锁

共享锁:指该锁可被多个线程所持有。

对ReentrantReadWriteLock其读锁是共享锁,其写锁是独占锁。 读锁的共享锁可保证并发读是非常高效的,读写,写读,写写的过程是互斥的。 import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; //资源类 class MyCache{ private volatile Map<String,Object> map = new HashMap<>(); //private Lock lock = new ReentrantLock();//每次只能有一个线程同时进行读写操作 private ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock(); //写 public void put(String key,Object value){ reentrantReadWriteLock.writeLock().lock();//加锁 try{ System.out.println(Thread.currentThread().getName()+"\t 正在写入:"+key); //线程睡眠一会 try { TimeUnit.MICROSECONDS.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); } map.put(key, value); System.out.println(Thread.currentThread().getName()+"\t 写入完成:"+key); }catch (Exception e){ e.printStackTrace(); }finally { reentrantReadWriteLock.writeLock().unlock();//释放锁 } } //读 public void get(String key){ reentrantReadWriteLock.readLock().lock(); try { System.out.println(Thread.currentThread().getName()+"\t 正在读取:"+key); //线程睡眠一会 try { TimeUnit.MICROSECONDS.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); } Object result = map.get(key); System.out.println(Thread.currentThread().getName()+"\t 读取完成:"+key); }catch (Exception e){ e.printStackTrace(); }finally { reentrantReadWriteLock.readLock().unlock(); } } } /** * 多个线程同时读一个资源类没有任何问题,所以为了满足并发量,读取共享资源该可以同时进行 * 但是 * 如果一个线程想去写共享资源来,就不应该再有其他线程可以对该资源进行读写 * 小总结: * 读- 读能共存 * 读- 写不能共存 * 写- 写不能共存 * * 写操作:原子+独占 */ public class ReadWriteLockDemo { public static void main(String[] args) { MyCache myCache = new MyCache(); //5个线程写操作 for (int i = 1; i <= 5 ; i++) { final int tempInt = i; new Thread(()->{ myCache.put(tempInt+"",tempInt); },String.valueOf(i)).start(); } //5个线程读操作 for (int i = 1; i <= 5 ; i++) { final int tempInt = i; new Thread(()->{ myCache.get(tempInt+""); },String.valueOf(i)).start(); } } }

5、CountDownLatch/CyclicBarrier/Semaphore使用过吗?

​ CountDownLatch: ​ 让一些线程阻塞直到另一些线程完成一系列操作后才被唤醒 ​ CountDownLatch主要有两个方法,当一个或多个线程调用await方法时,调用线程会被阻塞。 ​ 其它线程调用countDown方法会将计数器减1(调用countDown方法的线程不会阻塞),当计数器的值变为0时,因调用await方法被阻塞的线程会被唤醒,继续执行。

public enum CountryEnum { ONE(1,"齐"),TWO(2,"楚"),THREE(3,"燕"),FOUR(4,"赵"),FIVE(5,"魏"),SIX(6,"韩"); private Integer retCode; private String retMessage; public Integer getRetCode() { return retCode; } public String getRetMessage() { return retMessage; } CountryEnum(Integer retCode, String retMessage) { this.retCode = retCode; this.retMessage = retMessage; } public static CountryEnum forEach_CountryEnum(int index){ CountryEnum[] myArray = CountryEnum.values(); for(CountryEnum element : myArray){ if(index == element.getRetCode()) return element; } return null; } } import java.util.concurrent.CountDownLatch; public class CountDownLatchDemo { public static void main(String[] args) throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(6);//从6倒计时 for (int i = 1; i <= 6; i++) { new Thread(()->{ System.out.println(Thread.currentThread().getName()+"\t国,被灭……"); countDownLatch.countDown(); },CountryEnum.forEach_CountryEnum(i).getRetMessage()).start(); } countDownLatch.await(); System.out.println(Thread.currentThread().getName()+">>>>>>>>>大秦一统天下!"); } }

CyclicBarrier:字面意思是可循环(Cyclic)使用的屏障(Barrier)。他要做的就是让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活,线程进入屏障通过CyclicBarrier的await()方法。

import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; public class CyclicBarrierDemo { public static void main(String[] args) { CyclicBarrier cyclicBarrier = new CyclicBarrier(7,()-> {System.out.println(">>>>>召唤神龙");}); for(int i=1;i<=7;i++){ final int tempInt = i; new Thread(() -> { System.out.println(Thread.currentThread().getName()+"\t收集到第:"+tempInt+"颗龙珠"); try { cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } },String.valueOf(i)).start(); } } } Semaphore:信号量 两个目的,一个是用于多个共享资源的互斥使用,另一个用于并发线程数的控制。

6、阻塞队列:

​ 用在哪? ​ 生产者消费者模式 ​ 线程池 ​ 消息中间件

SynchronousQueue没有容量。 与其他BlockingQueue不同,SynchronousQueue是一个不存储元素的BlockingQueue. 每一个put操作必须要等待一个take操作,否则不能继续添加元素,反之亦然。

7、Synchronized和Lock有什么区别?用新的Lock有什么好处?举例说说

​ 1.原始构成: ​ synchronized:关键字属于JVM层面; ​ monitorenter(底层是通过monitor对象来完成,其实wait/notify等方法也依赖于monitor对象,只有在同步块或方法中才能调用wait/notify等方法) ​ monitorexit–释放锁 ​ Lock:具体类(java.util.concurrent.locks.lock)是api层面的锁 ​ 2.使用方法: ​ synchronized:不需要用户手动释放锁,当synchronized代码执行完后系统会自动让线程释放对锁的占用; ​ ReentrantLock:需要用户手动去释放锁,若没有主动释放锁,就有可能导致出现死锁现象。 ​ 需要lock()和unlock()方法配合try/finally语句块来完成。 ​ 3.等待是否可中断: ​ synchronized:不可中断,除非抛出异常或者正常运行完成 ​ ReentrantLock:可中断 ​ 1)设置超时方法tryLock(long timeout,TimeUnit unit); ​ 2)lockInterruptibly()放代码块中,调用interrupt()方法可中断。 ​ 4.加锁是否公平 ​ synchronized:非公平锁 ​ ReentrantLock:两者都可以,默认是非公平锁,构造方法可以传入Boolean值,true为公平锁,false为非公平锁 ​ 5.锁绑定多个条件Condition ​ synchronized:没有 ​ ReentrantLock:用来实现分组唤醒需要唤醒的线程们,可以精确唤醒,而不是像synchronized要么随机唤醒一个线程,要么全部唤醒。 ​

题目:多线程之间按顺序调用,实现A->B->C三个线程启动,要求如下: AA打印5次,BB打印10次,CC打印15次 紧接着 AA打印5次,BB打印10次,CC打印15次 ………… 走10轮 import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * 题目:多线程之间按顺序调用,实现A->B->C三个线程启动,要求如下: * AA打印5次,BB打印10次,CC打印15次 * 紧接着 * AA打印5次,BB打印10次,CC打印15次 * ………… * 走10轮 */ class ShareResource{ private int number = 1;//A:1 B:2 C:3 标志位 private Lock lock = new ReentrantLock(); private Condition c1 = lock.newCondition(); private Condition c2 = lock.newCondition(); private Condition c3 = lock.newCondition(); //线程A public void print5(){ lock.lock(); try { //1 判断 while (number != 1) { c1.await(); } //2 干活 for (int i = 1; i <= 5; i++) { System.out.println(Thread.currentThread().getName()+"\t"+i); } //3 通知--下一个(指定)线程干活 number = 2; c2.signal(); }catch (Exception e){ e.printStackTrace(); }finally { lock.unlock(); } } //线程B public void print10(){ lock.lock(); try { //1 判断 while (number != 2) { c2.await(); } //2 干活 for (int i = 1; i <= 10; i++) { System.out.println(Thread.currentThread().getName()+"\t"+i); } //3 通知--下一个(指定)线程干活 number = 3; c3.signal(); }catch (Exception e){ e.printStackTrace(); }finally { lock.unlock(); } } //线程C public void print15(){ lock.lock(); try { //1 判断 while (number != 3) { c3.await(); } //2 干活 for (int i = 1; i <= 15; i++) { System.out.println(Thread.currentThread().getName()+"\t"+i); } //3 通知--下一个(指定)线程干活 number = 1; c1.signal(); }catch (Exception e){ e.printStackTrace(); }finally { lock.unlock(); } } } public class SyncAndReentrantLockDemo { public static void main(String[] args) { ShareResource shareResource = new ShareResource(); new Thread(()->{ for (int i = 1; i <= 10; i++) { shareResource.print5(); } },"A").start(); new Thread(()->{ for (int i = 1; i <= 10; i++) { shareResource.print10(); } },"B").start(); new Thread(()->{ for (int i = 1; i <= 10; i++) { shareResource.print15(); } },"C").start(); } } 在多线程领域,所谓阻塞,在某些情况下会挂起线程(阻塞),一旦条件满足,被挂起的线程又会自动被唤醒 为什么需要BlockingQueue 好处是我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切BlockingQueue都帮你解决了 在concurrent包发布以前,在多线程环境下,我们每个程序员都必须去自己控制这些细节,尤其要兼顾效率和线程安全,给我们带来了不小的复杂度。 package com.itguigu; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; class MyRescource{ private volatile boolean FLAG = true;//默认开启,进行生产+消费 private AtomicInteger atomicInter = new AtomicInteger(); BlockingQueue<String> blockingQueue = null; public MyRescource(BlockingQueue<String> blockingQueue) { this.blockingQueue = blockingQueue; System.out.println(blockingQueue.getClass().getName()); } public void myProd() throws Exception{ String data = null; boolean retValue; while(FLAG){ data = atomicInter.incrementAndGet()+""; retValue = blockingQueue.offer(data,2L, TimeUnit.SECONDS); if(retValue){ System.out.println(Thread.currentThread().getName()+"\t 生产:蛋糕"+data+"成功"); }else{ System.out.println(Thread.currentThread().getName()+"\t 生产:蛋糕"+data+"失败"); } TimeUnit.SECONDS.sleep(1); } System.out.println(Thread.currentThread().getName()+"\t大老板叫停了,表示FLAG=false,生产动作结束"); } public void myConsumer() throws Exception{ String result = null; while(FLAG){ result = blockingQueue.poll(2L,TimeUnit.SECONDS); if(null == result || "".equalsIgnoreCase(result)){ FLAG = false; System.out.println(Thread.currentThread().getName()+"\t 超过连两秒没有取到蛋糕,退出"); System.out.println(); System.out.println(); return; } System.out.println(Thread.currentThread().getName()+"消费蛋糕:"+result+"成功"); } } public void stop() { this.FLAG = false; } } /** * volatile/CAS/atomicInteger/BlockQueue/线程交互/原子引用 */ public class ProdConsumer_BlockQueueDemo { public static void main(String[] args) { MyRescource myRescource = new MyRescource(new ArrayBlockingQueue<>(10)); new Thread(()->{ System.out.println(Thread.currentThread().getName()+"\t生产线程启动"); try { myRescource.myConsumer(); } catch (Exception e) { e.printStackTrace(); } },"Prod").start(); new Thread(()->{ System.out.println(Thread.currentThread().getName()+"\t消费线程启动"); try { myRescource.myConsumer(); } catch (Exception e) { e.printStackTrace(); } },"Consumer").start(); //线程暂停一会 try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(); System.out.println(); System.out.println(); System.out.println("5秒钟时间到,大老板main线程叫停,活动结束"); myRescource.stop(); } }

8、线程池

​ 为什么用线程池,优势? ​ 线程池的工作主要是控制运行的线程数量,处理过程中将任务放入队列,然后再线程创建后启动这些任务,如果线程数量超过最大数量超出数量的线程排队等候,等其他线程执行完毕,再从队列中取出任务来执行。

特点:线程复用;控制最大并发数;管理线程。 1)降低资源消耗。通过重复利用自己创建的线程来降低线程创建和销毁造成的消耗。 2)提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。 3)提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统稳定性,使用线程池可以进行同一分配,调优和监控。 线程池如何使用? 线程池的几个重要参数介绍?(7大参数) corePoolSize:线程池中的常驻核心线程数 maximumPoolSize:线程池能够容纳同时执行的最大线程数,此值必须大于等于1 keepAliveTime:多余的空闲线程的存活时间。当前线程池数量超过corePoolSize时,当空闲时间达到keepAliveTime值时,多余空闲线程会被销毁直到只剩下corePoolSize个线程为止 unit:keepAliveTime的单位 workQueue:任务队列,被提交但尚未被执行的任务 threadFactory:表示生成线程池中工作线程的线程工厂,用于创建线程一般用默认即可。 handler:拒绝策略,表示当队列满了并且工作线程大于等于线程池的最大线程数(maximumPoolSize)时如何拒绝 说说线程池的底层工作原理? package com.itguigu; import java.util.concurrent.*; /** * 线程池 * Array Arrays * Collection Collections * Executor Executors ---Executors工具类 * 第4中获得/使用java多线程的方式,线程池 */ public class MyThreadPoolDemo { public static void main(String[] args) { //自定义线程池,工作中使用 ExecutorService threadPool = new ThreadPoolExecutor( 2, 5, 1L, TimeUnit.SECONDS, new LinkedBlockingDeque<>(3), Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy()); //最大线程数=阻塞队列数+最大线程数 /* ThreadPoolExecutor:拒绝策略(4种) ThreadPoolExecutor.AbortPolicy()----粗暴,超过最大线程数,上来就报异常,默认 ThreadPoolExecutor.CallerRunsPolicy()---不会丢弃,也不会抛异常,而是回退给上级线程(如:main) ThreadPoolExecutor.DiscardOldestPolicy()---丢弃等待最久的任务,然后把当前任务加入队列中尝试再次提交任务 ThreadPoolExecutor.DiscardPolicy()---直接丢弃任务 */ try { for (int i=1;i<=9;i++) { threadPool.execute(()->{ System.out.println(Thread.currentThread().getName()+"\t办理业务"); }); } } catch (Exception e) { e.printStackTrace(); } finally { threadPool.shutdown(); } } //调用线程 public static void threadPoolInit(){ //使用池化,不建议使用new ExecutorService threadPool = Executors.newFixedThreadPool(5);//固定数线程池 //ExecutorService threadPool = Executors.newSingleThreadExecutor();//1池1线程 //ExecutorService threadPool = Executors.newCachedThreadPool();//1池N线程 try { //模拟10个用户来办理业务,每个用户就是一个来自外部的请求线程 for (int i=1;i<=10;i++) { threadPool.execute(()->{ System.out.println(Thread.currentThread().getName()+"\t办理业务"); }); } } catch (Exception e) { e.printStackTrace(); } finally { threadPool.shutdown(); } } }

9、如何配置策略,你是如何考虑的?

两种:CPU密集型、IO密集型 CPUT密集型:该任务需要大量的运算,而没有阻塞,CPU一直全速运行。 CPU密集任务只有在真正的多核CPU上才可能得到加速(通过多线程)

CPU密集型的任务配置尽可能少的线程数量: 一般公式:CPU核数+1个线程的线程池

IO密集: 1-> IO密集型的任务线程并不是一直在执行任务,则配置尽可能的多的线程 2-> 即该任务需要大量的IO,即大量的阻塞。 在单线程上运行IO密集型的任务会导致浪费大量的CPU运行能力浪费在等待。所以IO密集型任务中使用多线程可以大大加速程序运行,即使在单核CPU上,这种加速主要就是利用了被浪费掉的阻塞时间。

IO密集型时,大部分线程都阻塞,故需要多配置线程数: 参考公式:CPU核数/(1-阻塞系数) 阻塞系数在0.8~0.9之间

​ 比如8核CPU:8/(1-0.9) = 80个线程数

死锁编码及定位分析? 死锁产生原因:系统资源不足、进程运行推进的顺序不合理、资源分配不当

package com.itguigu; import java.util.concurrent.TimeUnit; class HoldLockThread implements Runnable{ private String lockA; private String lockB; public HoldLockThread(String lockA, String lockB) { this.lockA = lockA; this.lockB = lockB; } @Override public void run() { synchronized (lockA){ System.out.println(Thread.currentThread().getName()+"\t持有:"+lockA+"去尝试获取:"+lockB); try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } synchronized (lockB){ System.out.println(Thread.currentThread().getName()+"\t持有:"+lockB+"去尝试获取:"+lockA); } } } } /** * 死锁 * 解决: * jps命令定位进程号 * jstack找到死锁查看 */ public class DeadLockDemo { public static void main(String[] args) { String lockA = "lockA"; String lockB = "lockB"; new Thread(new HoldLockThread(lockA,lockB),"ThreadAAA").start(); new Thread(new HoldLockThread(lockB,lockA),"ThreadBBB").start(); /* linux ps -ef|grep xxx ls -l window下的java云讯程序 也有类似ps的查看进程的命令,但是目前我们需要查看的是 jps = java ps jps -l */ } }
最新回复(0)