多线程返回结果

mac2024-04-15  39

package com.aebiz.plugins.b2b.statistics.statistics.logstatistics.plugins.applogstatistics.applogstatistics_p2.service.impl; import com.aebiz.baseframework4ms.basecrud.service.BaseServiceImpl; import com.aebiz.plugins.b2b.statistics.callback.plugins.callback_p2.dto.AppUrlVisitLogDTO; import com.aebiz.plugins.b2b.statistics.esbasic.plugins.esbasic_p2.service.EsBasic_P2Service; import com.aebiz.plugins.b2b.statistics.statistics.logstatistics.plugins.applogstatistics.applogstatistics_p2.service.BaseAppVisitStatisticsLog_P2Service; import com.aebiz.plugins.b2b.statistics.statistics.logstatistics.plugins.applogstatistics.base.dao.BaseAppVisitStatisticsLogDAO; import com.aebiz.plugins.b2b.statistics.statistics.logstatistics.plugins.applogstatistics.vo.BaseAppVisitStatisticsLogModel; import com.aebiz.plugins.b2b.statistics.statistics.logstatistics.plugins.applogstatistics.vo.BaseAppVisitStatisticsLogQueryModel; import com.alibaba.fastjson.JSON; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.stream.Collectors; @Service public class BaseAppVisitStatisticsLog_P2ServiceImpl<M extends BaseAppVisitStatisticsLogModel,QM extends BaseAppVisitStatisticsLogQueryModel> extends BaseServiceImpl<M,QM> implements BaseAppVisitStatisticsLog_P2Service<M,QM> { private static Logger log = LoggerFactory.getLogger(BaseAppVisitStatisticsLog_P2ServiceImpl.class); @Autowired private EsBasic_P2Service esBasic_p2Service; /** * 处理数据线程池大小 */ private static final int nThreads=10; /** * 索引名称 */ private static final String INDEX_NAME="api_call_log"; /** * 类型 */ private static final String TYPE_NAME="api_call_log"; private BaseAppVisitStatisticsLogDAO myDAO; ExecutorService executorService = Executors.newFixedThreadPool(nThreads); @Autowired public void setMyDAO(BaseAppVisitStatisticsLogDAO myDAO) { this.myDAO = myDAO; super.setDao(myDAO); } /** * 统计指定天数app访问记录任务 * @param dateStr */ @Override public void appVisitJob(String dateStr) { try { String str = esBasic_p2Service.termQueryByCond(dateStr, "requestTime", INDEX_NAME, TYPE_NAME); if (StringUtils.isBlank(str)){ return; } List<AppUrlVisitLogDTO> appUrlVisitLogDTOS = JSON.parseArray(str, AppUrlVisitLogDTO.class); // Map<appId,Map<url,List<ApiCallLogModel>>> Map<String, Map<String,List<AppUrlVisitLogDTO>>> appIdMap = appUrlVisitLogDTOS.parallelStream() .collect(Collectors.groupingBy(AppUrlVisitLogDTO::getAppId,Collectors.groupingBy(AppUrlVisitLogDTO::getUrl))); for (Map.Entry<String, Map<String, List<AppUrlVisitLogDTO>>> appIdMapEntry : appIdMap.entrySet()) { String appId = appIdMapEntry.getKey(); Map<String, List<AppUrlVisitLogDTO>> urlMap = appIdMapEntry.getValue(); for (Map.Entry<String, List<AppUrlVisitLogDTO>> urlMapEntry : urlMap.entrySet()) { String url = urlMapEntry.getKey(); List<AppUrlVisitLogDTO> listEntry = urlMapEntry.getValue(); TaskCall taskCall = new TaskCall(appId,url,dateStr,listEntry); executorService.submit(taskCall); } } }catch (Exception e) { log.error("appVisitJob Exception date="+dateStr,e); } } class TaskCall implements Callable<Boolean> { private String appId; private String url; private String dateStr; public TaskCall(String appId, String url, String dateStr, List<AppUrlVisitLogDTO> appUrlVisitLogDTOList) { this.appId = appId; this.url = url; this.dateStr = dateStr; this.appUrlVisitLogDTOList = appUrlVisitLogDTOList; } private List<AppUrlVisitLogDTO> appUrlVisitLogDTOList; public TaskCall(String appId, String url, List<AppUrlVisitLogDTO> appUrlVisitLogDTOList) { this.appId = appId; this.url = url; this.appUrlVisitLogDTOList = appUrlVisitLogDTOList; } @Override public Boolean call() { StringBuffer requestSb = new StringBuffer(); StringBuffer responseSb = new StringBuffer(); int successStatus = 0; Long totalCustomTime = 0L; M model = null; try { for (AppUrlVisitLogDTO item : appUrlVisitLogDTOList) { requestSb.append(item.getRequestBody()); responseSb.append(item.getResponseBody()); if (Objects.equals(item.getStatusCode(),"200")){ ++successStatus; } totalCustomTime+=item.getConsumeTime(); } model = (M) myDAO.getModel(); Integer total = appUrlVisitLogDTOList.size(); int avgCustomTime = (int) (totalCustomTime / total); model.setSuccessNum(successStatus); model.setFailNum(total-successStatus); model.setRequestByte(requestSb.toString().getBytes().length); model.setResponseByte(responseSb.toString().getBytes().length); model.setTimeComsumeAvg(avgCustomTime); model.setTotalNum(total); model.setAppId(appId); model.setUrl(url); model.setStDate(dateStr); myDAO.create(model); }catch (Exception e) { log.error("create BaseAppVisitStatisticsLogModel Exception BaseAppVisitStatisticsLogModel:"+JSON.toJSONString(model),e); return Boolean.FALSE; } return Boolean.TRUE; } } }
最新回复(0)