package com.aebiz.plugins.b2b.statistics.statistics.logstatistics.plugins.applogstatistics.applogstatistics_p2.service.impl;
import com.aebiz.baseframework4ms.basecrud.service.BaseServiceImpl;
import com.aebiz.plugins.b2b.statistics.callback.plugins.callback_p2.dto.AppUrlVisitLogDTO;
import com.aebiz.plugins.b2b.statistics.esbasic.plugins.esbasic_p2.service.EsBasic_P2Service;
import com.aebiz.plugins.b2b.statistics.statistics.logstatistics.plugins.applogstatistics.applogstatistics_p2.service.BaseAppVisitStatisticsLog_P2Service;
import com.aebiz.plugins.b2b.statistics.statistics.logstatistics.plugins.applogstatistics.base.dao.BaseAppVisitStatisticsLogDAO;
import com.aebiz.plugins.b2b.statistics.statistics.logstatistics.plugins.applogstatistics.vo.BaseAppVisitStatisticsLogModel;
import com.aebiz.plugins.b2b.statistics.statistics.logstatistics.plugins.applogstatistics.vo.BaseAppVisitStatisticsLogQueryModel;
import com.alibaba.fastjson.JSON;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
@Service
public class BaseAppVisitStatisticsLog_P2ServiceImpl<M extends BaseAppVisitStatisticsLogModel,QM extends BaseAppVisitStatisticsLogQueryModel> extends BaseServiceImpl<M,QM> implements BaseAppVisitStatisticsLog_P2Service<M,QM> {
private static Logger log = LoggerFactory.getLogger(BaseAppVisitStatisticsLog_P2ServiceImpl.class);
@Autowired
private EsBasic_P2Service esBasic_p2Service;
/**
* 处理数据线程池大小
*/
private static final int nThreads=10;
/**
* 索引名称
*/
private static final String INDEX_NAME="api_call_log";
/**
* 类型
*/
private static final String TYPE_NAME="api_call_log";
private BaseAppVisitStatisticsLogDAO myDAO;
ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
@Autowired
public void setMyDAO(BaseAppVisitStatisticsLogDAO myDAO) {
this.myDAO = myDAO;
super.setDao(myDAO);
}
/**
* 统计指定天数app访问记录任务
* @param dateStr
*/
@Override
public void appVisitJob(String dateStr) {
try {
String str = esBasic_p2Service.termQueryByCond(dateStr, "requestTime", INDEX_NAME, TYPE_NAME);
if (StringUtils.isBlank(str)){
return;
}
List<AppUrlVisitLogDTO> appUrlVisitLogDTOS = JSON.parseArray(str, AppUrlVisitLogDTO.class);
// Map<appId,Map<url,List<ApiCallLogModel>>>
Map<String, Map<String,List<AppUrlVisitLogDTO>>> appIdMap = appUrlVisitLogDTOS.parallelStream()
.collect(Collectors.groupingBy(AppUrlVisitLogDTO::getAppId,Collectors.groupingBy(AppUrlVisitLogDTO::getUrl)));
for (Map.Entry<String, Map<String, List<AppUrlVisitLogDTO>>> appIdMapEntry : appIdMap.entrySet()) {
String appId = appIdMapEntry.getKey();
Map<String, List<AppUrlVisitLogDTO>> urlMap = appIdMapEntry.getValue();
for (Map.Entry<String, List<AppUrlVisitLogDTO>> urlMapEntry : urlMap.entrySet()) {
String url = urlMapEntry.getKey();
List<AppUrlVisitLogDTO> listEntry = urlMapEntry.getValue();
TaskCall taskCall = new TaskCall(appId,url,dateStr,listEntry);
executorService.submit(taskCall);
}
}
}catch (Exception e) {
log.error("appVisitJob Exception date="+dateStr,e);
}
}
class TaskCall implements Callable<Boolean> {
private String appId;
private String url;
private String dateStr;
public TaskCall(String appId, String url, String dateStr, List<AppUrlVisitLogDTO> appUrlVisitLogDTOList) {
this.appId = appId;
this.url = url;
this.dateStr = dateStr;
this.appUrlVisitLogDTOList = appUrlVisitLogDTOList;
}
private List<AppUrlVisitLogDTO> appUrlVisitLogDTOList;
public TaskCall(String appId, String url, List<AppUrlVisitLogDTO> appUrlVisitLogDTOList) {
this.appId = appId;
this.url = url;
this.appUrlVisitLogDTOList = appUrlVisitLogDTOList;
}
@Override
public Boolean call() {
StringBuffer requestSb = new StringBuffer();
StringBuffer responseSb = new StringBuffer();
int successStatus = 0;
Long totalCustomTime = 0L;
M model = null;
try {
for (AppUrlVisitLogDTO item : appUrlVisitLogDTOList) {
requestSb.append(item.getRequestBody());
responseSb.append(item.getResponseBody());
if (Objects.equals(item.getStatusCode(),"200")){
++successStatus;
}
totalCustomTime+=item.getConsumeTime();
}
model = (M) myDAO.getModel();
Integer total = appUrlVisitLogDTOList.size();
int avgCustomTime = (int) (totalCustomTime / total);
model.setSuccessNum(successStatus);
model.setFailNum(total-successStatus);
model.setRequestByte(requestSb.toString().getBytes().length);
model.setResponseByte(responseSb.toString().getBytes().length);
model.setTimeComsumeAvg(avgCustomTime);
model.setTotalNum(total);
model.setAppId(appId);
model.setUrl(url);
model.setStDate(dateStr);
myDAO.create(model);
}catch (Exception e) {
log.error("create BaseAppVisitStatisticsLogModel Exception BaseAppVisitStatisticsLogModel:"+JSON.toJSONString(model),e);
return Boolean.FALSE;
}
return Boolean.TRUE;
}
}
}