使用CountDownLatch进行并发处理

mac2025-06-15  12

比如后台需要并发刷新很多元素,逐个刷新比较慢,可以使用CountDownLatch进行并发刷新。

当请求特别多时候,很容易造成线程池满,此时应该考虑有一种线程保护机制,采用快速失败的方式对外抛出特定的异常。

上层业务感知该异常后,可提示前端繁忙,请稍后再试一试。

线程保护机制是创建线程执行器时候,采用SynchronousQueue队列,当请求超过线程池最大队列时候,线程池会直接抛出RejectedExecutionException,说明系统繁忙,此时应该通过限流的方式处理。

一、建立线程池工厂

public class ElementRefreshThreadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger(1); protected final AtomicInteger threadNumber = new AtomicInteger(1); protected final ThreadGroup group; protected final String namePrefix; protected final boolean isDaemon; public ElementRefreshThreadFactory() { this("ElementRefresher"); } public ElementRefreshThreadFactory(String name) { this(name, true); } public ElementRefreshThreadFactory(String preffix, boolean daemon) { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = preffix + "-" + poolNumber.getAndIncrement() + "-thread-"; isDaemon = daemon; } public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); t.setContextClassLoader(NamedThreadFactory.class.getClassLoader()); t.setPriority(Thread.MAX_PRIORITY); t.setDaemon(isDaemon); return t; } }

二、构建线程池及线程执行器

 这里提示系统繁忙需要由线程执行器抛出RejectedExecutionException,由业务系统捕获后封装成自己的系统忙异常再抛出到上层业务。

/** 使用 SynchronousQueue,同步队列,当请求数量突破极限1200个线程后,采用快速失败,提示系统繁忙,请重试的方式。 */ private final static ThreadFactory threadFactory = new ElementRefreshThreadFactory(); private final static ThreadPoolExecutor elementsRefreshExcutor = new ThreadPoolExecutor(100, 1200, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), threadFactory);

三、使用CountDownLatch并发处理

这一段代码并发刷新一些元素,注意如果线程中有处理失败的,此时一定要在finally中释放countDownLatch的数量。

public List<DisplayElementVo> refreshElements(ServiceContext context, List<DisplayElementVo> sceneElements) { if (CollectionUtils.isEmpty(sceneElements)) { return null; } List<DisplayElementVo> dispalyElements = new CopyOnWriteArrayList<>(); // 支持并发刷新所有元素 CountDownLatch countDownLatch = new CountDownLatch(sceneElements.size()); try { for (DisplayElementVo element : sceneElements) { elementsRefreshExcutor.execute(() -> { try { DisplayElementVo dispalyElement = this.refreshElement(context, element); if (null != dispalyElement) { dispalyElements.add(dispalyElement); } else { log.warn("refreshElementNull ,elementId:{}", element.getId()); } } catch(Exception e) {} finally { countDownLatch.countDown(); } }); } } catch (RejectedExecutionException e) { log.error("conCurrentRefreshElementsFailed ", e); //抛出一个业务自定义的系统忙异常给到上层业务 throw new ServiceSystemException(CommonErrorCode.SYSTEM_BUSY); // 这里是否要把所有的count全部释放????笔者也没有试验过。 } try { // 阻塞等待所有线程执行完成 countDownLatch.await(); } catch (InterruptedException e) { log.error("timeOutRefresh elements", e); } return dispalyElements; }

 

最新回复(0)