Spark自定义累加器需要实现 AccumulatorParam
!!!!!!
需要注意的是 ,源码中给出
也就是说两个方法的实现是不一样的。
下面是我的实现
TimeAccumulator.java
import constant.Constants; import org.apache.spark.AccumulatorParam; import util.StringUtils; public class TimeAccumulator implements AccumulatorParam<String> { @Override public String addAccumulator(String s1, String s2) { //校验:s2位空的话,直接返回s1 if(StringUtils.isEmpty(s2)) { return s1; } // 使用StringUtils工具类,从s1中,提取s2对应的值,并累加 String oldValue = StringUtils.getFieldFromConcatString(s1, "\\|", s2); if(oldValue != null) { int newValue = Integer.valueOf(oldValue) + 1; return StringUtils.setFieldInConcatString(s1, "\\|", s2, String.valueOf(newValue)); } return s1; } @Override public String addInPlace(String r1, String r2) { //校验:r1位空的话,直接返回r2 if(StringUtils.isEmpty(r1)) { return r2; } // 使用StringUtils工具类,从r1 r2对应的值,并累加 int oldValue1 = Integer.parseInt(StringUtils.getFieldFromConcatString(r1,"\\|",Constants.S_1s_3s)); int oldValue2 = Integer.parseInt(StringUtils.getFieldFromConcatString(r1,"\\|",Constants.S_4s_6s)); int oldValue3 = Integer.parseInt(StringUtils.getFieldFromConcatString(r1,"\\|",Constants.S_7s_9s)); int oldValue4 = Integer.parseInt(StringUtils.getFieldFromConcatString(r1,"\\|",Constants.S_10s_30s)); int oldValue5 = Integer.parseInt(StringUtils.getFieldFromConcatString(r1,"\\|",Constants.S_30s_60s)); int oldValue6 = Integer.parseInt(StringUtils.getFieldFromConcatString(r1,"\\|",Constants.S_1m_3m)); int oldValue7 = Integer.parseInt(StringUtils.getFieldFromConcatString(r1,"\\|",Constants.S_3m_10m)); int oldValue8 = Integer.parseInt(StringUtils.getFieldFromConcatString(r1,"\\|",Constants.S_10m_30m)); int oldValue9 = Integer.parseInt(StringUtils.getFieldFromConcatString(r1,"\\|",Constants.S_30m)); int newValue1 = Integer.parseInt(StringUtils.getFieldFromConcatString(r2,"\\|",Constants.S_1s_3s)); int newValue2 = Integer.parseInt(StringUtils.getFieldFromConcatString(r2,"\\|",Constants.S_4s_6s)); int newValue3 = Integer.parseInt(StringUtils.getFieldFromConcatString(r2,"\\|",Constants.S_7s_9s)); int newValue4 = Integer.parseInt(StringUtils.getFieldFromConcatString(r2,"\\|",Constants.S_10s_30s)); int newValue5 = Integer.parseInt(StringUtils.getFieldFromConcatString(r2,"\\|",Constants.S_30s_60s)); int newValue6 = Integer.parseInt(StringUtils.getFieldFromConcatString(r2,"\\|",Constants.S_1m_3m)); int newValue7 = Integer.parseInt(StringUtils.getFieldFromConcatString(r2,"\\|",Constants.S_3m_10m)); int newValue8 = Integer.parseInt(StringUtils.getFieldFromConcatString(r2,"\\|",Constants.S_10m_30m)); int newValue9 = Integer.parseInt(StringUtils.getFieldFromConcatString(r2,"\\|",Constants.S_30m)); return Constants.S_1s_3s+"="+(oldValue1+newValue1)+"|"+Constants.S_4s_6s+"="+(oldValue2+newValue2)+"|"+Constants.S_7s_9s+"="+(oldValue3+newValue3)+"|"+ Constants.S_10s_30s+"="+(oldValue4+newValue4)+"|"+Constants.S_30s_60s+"="+(oldValue5+newValue5)+"|"+Constants.S_1m_3m+"="+(oldValue6+newValue6)+"|"+ Constants.S_3m_10m+"="+(oldValue7+newValue7)+"|"+Constants.S_10m_30m+"="+(oldValue8+newValue8)+"|"+Constants.S_30m+"="+(oldValue9+newValue9); } @Override public String zero(String initialValue) { return Constants.S_1s_3s+"=0|"+Constants.S_4s_6s+"=0|"+Constants.S_7s_9s+"=0|"+ Constants.S_10s_30s+"=0|"+Constants.S_30s_60s+"=0|"+Constants.S_1m_3m+"=0|"+ Constants.S_3m_10m+"=0|"+Constants.S_10m_30m+"=0|"+Constants.S_30m+"=0"; } }StringUtils.java
package util; public class StringUtils { /** * 判断字符串是否为空 * * @param str 字符串 * @return 是否为空 */ public static boolean isEmpty(String str) { return "".equals(str) || str == null; } /** * 判断字符串是否不为空 * * @param str 字符串 * @return 是否不为空 */ public static boolean isNotEmpty(String str) { return str != null && str.length() > 0; } /** * 从拼接的字符串中提取字段 * searchKeywords=abc|clickCategoryIds=1,2,3 * * @param str 字符串 searchKeywords=abc|clickCategoryIds=1,2,3 * @param delimiter 分隔符 | * @param field 字段 clickCategoryIds * @return 字段值 1,2,3 */ public static String getFieldFromConcatString(String str, String delimiter, String field) { String[] split = str.split(delimiter); String value = null; for (String s : split) { String[] split1 = s.split("="); if(field.equals(split1[0])){ value = split1[1]; } } return value; } /** * 从拼接的字符串中给字段设置值 * searchKeywords=abc|clickCategoryIds=1,2,3 * * @param str 字符串 searchKeywords=abc|clickCategoryIds=1,2,3 * @param delimiter 分隔符 | * @param field 字段名 searchKeywords * @param newFieldValue 新的field值 efg * @return 字段值 searchKeywords=efg|clickCategoryIds=1,2,3 */ public static String setFieldInConcatString(String str, String delimiter, String field, String newFieldValue) { String[] split = str.split(delimiter); StringBuilder sb = new StringBuilder(); for (int i = 0; i < split.length; i++) { String[] split1 = split[i].split("="); if(field.equals(split1[0])){ sb.append(split1[0]); sb.append("="); sb.append(newFieldValue); sb.append("|"); continue; } sb.append(split[i]);sb.append("|"); } return sb.substring(0,sb.lastIndexOf("|")); } }
