日常操作:
list.Stream().collect(Collectors.counting())collect:收集器 Collector作为collect方法的参数 Collector是一个接口,它是一个可变的汇聚操作,将输入元素累积到一个可变的结果容器中;它会在所有元素都处理完毕后,将累积的结果转换为一个最终的表示(这是一个可选操作),它支持串行和并行两种方式执行。 Collectors本身提供了关于Collector的常见汇聚实现,Collectors本事实际上是一个工厂,对于Collectors静态工厂类来说,其实现一共分为两种情况:
通过CollectorImpl来实现通过reducing方法来实现,reducing本身也是通过CollectorImpl实现的Collector是由4个函数构成的
supplier:用于提供可变结果容器accumulator:用于不断的向可变结果容器中叠加流中的每一个元素combiner:用于线程并发的,将两个部分结果合并成最终的一个结果,比如有4个线程,那么生成4个部分结果1、2、3、4,combiner会将4个部分结果合并,但是是两个一起合并,如1和2可能合并成5,然后5与3合并成6,6与4合并成7,最终结果就是7,合并的过程可以是产生一个新的结果容器,如1和2合并时产生一个新的结果容器,然后1和2都添加到新的结果容器中,也可以是将2添加到1中,然后返回1。finisher:完成器,是一个可以选操作,可有可无,将累积的中间结果转换成一种最终的表示。为了确保串行与并行操作结果的等价性,Collector函数需要满足两个条件: identity(同一性) :
combiner.apply(a, supplier.get()), 其实类似于(List<Stirng> list1,List<String> list2 -> {list1.addAll(list2);return list1})associativity(结合性):
//单线程执行 A a1 = supplier.get();//获取可变的结果容器 accumulator.accept(a1, t1);//执行叠加,将t1添加到a1中 accumulator.accept(a1, t2);//执行叠加,将t2添加到a1中 R r1 = finisher.apply(a1); // 将中间结果转换成最终的结果 //多线程执行 A a2 = supplier.get(); accumulator.accept(a2, t1);//线程1 A a3 = supplier.get(); accumulator.accept(a3, t2);//线程2 R r2 = finisher.apply(combiner.apply(a2, a3)); //将两个线程的中间结果合并成一个中间结果,并将其转换成最终结果Collector<T,A,R>中三个泛型参数:
T :表示集合之中或者流当中每个元素的类型A :表示可变容器的类型R : 表示汇聚操作返回的结果类型自定义收集器:将一个List转换成Set
public class MyCollector<T> implements Collector<T, Set<T>, Set<T>> { @Override public Supplier<Set<T>> supplier() { System.out.println("exec supplier......"); //return () -> new HashSet<T>(); return HashSet<T>::new; } @Override public BiConsumer<Set<T>, T> accumulator() { System.out.println("exec accumulator....."); // return (set,item) -> set.add(item); //此处只能使用Set,而不能使用HashSet,因为Supplier方法返回的是Set,而里面具体实现返回的是HashSet,如果这里使用HashSet, // 那么当supplier方法中返回的是其他的Set时,就会相互矛盾,比如返回TreeSet,而这里返回HashSet return Set<T>::add; } @Override public BinaryOperator<Set<T>> combiner() { System.out.println("exec combiner......"); return (Set<T> item1,Set<T> item2) -> { item1.addAll(item2); return item1; }; } @Override public Function<Set<T>, Set<T>> finisher() { System.out.println("exec finisher......"); return set -> set; } @Override public Set<Characteristics> characteristics() { System.out.println("exec characteristics......"); return Collections.unmodifiableSet(EnumSet.of(Characteristics.IDENTITY_FINISH,Characteristics.UNORDERED)); } public static void main(String[] args) { List<String> list = Arrays.asList("hello","world","wellcom"); Set<String> set = list.stream().collect(new MyCollector<>()); System.out.println(set); } }Stream中的collect方法的具体实现在ReferencePipeline抽闲类中,具体实现如下:
public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) { A container; //并行操作 if (isParallel() && (collector.characteristics().contains(Collector.Characteristics.CONCURRENT)) && (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) { container = collector.supplier().get(); BiConsumer<A, ? super P_OUT> accumulator = collector.accumulator(); forEach(u -> accumulator.accept(container, u)); } else { //串行操作 container = evaluate(ReduceOps.makeRef(collector)); } return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH) ? (R) container : collector.finisher().apply(container); }对于串行操作首先会调用ReduceOps.makeRef(collector)方法,该方法的具体实现如下:
public static <T, I> TerminalOp<T, I> makeRef(Collector<? super T, I, ?> collector) { Supplier<I> supplier = Objects.requireNonNull(collector).supplier(); BiConsumer<I, ? super T> accumulator = collector.accumulator(); BinaryOperator<I> combiner = collector.combiner(); class ReducingSink extends Box<I> implements AccumulatingSink<T, I, ReducingSink> { @Override public void begin(long size) { state = supplier.get(); } @Override public void accept(T t) { accumulator.accept(state, t); } @Override public void combine(ReducingSink other) { state = combiner.apply(state, other.state); } } return new ReduceOp<T, I, ReducingSink>(StreamShape.REFERENCE) { @Override public ReducingSink makeSink() { return new ReducingSink(); } @Override public int getOpFlags() { return collector.characteristics().contains(Collector.Characteristics.UNORDERED) ? StreamOpFlag.NOT_ORDERED : 0; } }; }makeRef方法中首先会调用Collector中定义的supplier、accumulator、combiner方法,同时会getOpFlags方法中判断Collector中的Characteristics方法执行完成后是否包含UNORDERED,所以这里会执行Collector中characteristics方法。 在collect方法执行结束时再次执行characteristics方法,判断是否设置Characteristics.IDENTITY_FINISH 属性,该属性如果设置,则说明中间结果与返回结果是一样的,即finisher方法中的返回的就是输入参数,所以直接强制类型转换® container,将中间结果转换成需要的返回类型,如果没有设置Characteristics.IDENTITY_FINISH ,那么就会执行finisher方法中返回的函数式接口,将中间结果转换成最终的返回类型。
自定义收集器:将set转换成map,set中的元素即作为key也作为value
public class MyCollector2<T> implements Collector<T, Set<T>,Map<T,T>> { @Override public Supplier<Set<T>> supplier() { System.out.println("exec supplier"); return HashSet::new; } @Override public BiConsumer<Set<T>,T> accumulator() { System.out.println("exec accumulator"); BiConsumer<Set<T>, T> setTBiConsumer = (set,item) -> { System.out.println("accumulator set value is : " +set + "," + Thread.currentThread().getName() ); set.add(item); }; return setTBiConsumer; } @Override public BinaryOperator<Set<T>> combiner() { System.out.println("exec combiner"); return (set1, set2) -> { System.out.println("------------"); set1.addAll(set2); return set1; }; } @Override public Function<Set<T>, Map<T, T>> finisher() { System.out.println("exec finisher"); return (set) -> { Map<T,T> map = new HashMap<>(); set.stream().forEach(item ->map.put(item,item)); return map; }; } @Override public Set<Characteristics> characteristics() { System.out.println("exec characteristics"); return Collections.unmodifiableSet(EnumSet.of(Characteristics.UNORDERED,Characteristics.CONCURRENT)); } public static void main(String[] args) { for (int i=0;i<100;i++){ Set<String> set = new HashSet<>(Arrays.asList("hello","world","hello world","a","b","c","d","e","f","g")); Map<String,String> map = set.parallelStream().collect(new MyCollector2<>()); System.out.println(map); } } }上述示例使用并行流操作,同时将Collector的Characteristics设置为CONCURRENT(表示并行),该程序可能会报错(java.util.ConcurrentModificationException: java.util.ConcurrentModificationException),CONCURRENT属性表明多个线程对同一个结果容器进行操作,设置该属性之后不能在结果容器进行额外的操作,比如上述示例中对set进行打印,这里是调用了Set的toString方法,会遍历Set中的元素,同时遍历Set,同时修改Set则是不允许的,所以直接报错。CONCURRENT属性的设置既然是多个线程对同一个结果容器操作,那么combiner方法返回的函数式接口代码就不会执行,因为combiner中的函数式接口只对不同的结果容器进行合并操作,如果是同一个结果容器则无需进行合并。 如果没有设置CONCURRENT属性,只是用并行流进行操作,则是每个线程都有自己的结果容器,supplier方法中返回的函数式接口会执行多次,生成多个结果容器,各个线程生成的结果容器是相互不干扰的,所以不会报异常
Collectors中代码: groupingBy()代码
public static <T, K, D, A, M extends Map<K, D>> Collector<T, ?, M> groupingBy(Function<? super T, ? extends K> classifier,//这里的泛型K是分类器返回结果类型,表示的最终返回的Map的key是K类型的,T是流中的元素 Supplier<M> mapFactory, Collector<? super T, A, D> downstream) { Supplier<A> downstreamSupplier = downstream.supplier(); BiConsumer<A, ? super T> downstreamAccumulator = downstream.accumulator(); //构造新的accumulator,依赖于downstream下游收集器的accumulator,新的accumulator中间结果容器编程了Map<K, A>类型,其中键是K类型了,value值是下游收集器的中间结果类型 BiConsumer<Map<K, A>, T> accumulator = (m, t) -> { K key = Objects.requireNonNull(classifier.apply(t), "element cannot be mapped to a null key"); A container = m.computeIfAbsent(key, k -> downstreamSupplier.get()); downstreamAccumulator.accept(container, t); }; BinaryOperator<Map<K, A>> merger = Collectors.<K, A, Map<K, A>>mapMerger(downstream.combiner()); @SuppressWarnings("unchecked") Supplier<Map<K, A>> mangledFactory = (Supplier<Map<K, A>>) mapFactory; if (downstream.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)) { return new CollectorImpl<>(mangledFactory, accumulator, merger, CH_ID); } else { @SuppressWarnings("unchecked") Function<A, A> downstreamFinisher = (Function<A, A>) downstream.finisher(); Function<Map<K, A>, M> finisher = intermediate -> { intermediate.replaceAll((k, v) -> downstreamFinisher.apply(v)); @SuppressWarnings("unchecked") M castResult = (M) intermediate; return castResult; }; return new CollectorImpl<>(mangledFactory, accumulator, merger, finisher, CH_NOID); } }partitioningBy()代码
public static <T, D, A> Collector<T, ?, Map<Boolean, D>> partitioningBy(Predicate<? super T> predicate, Collector<? super T, A, D> downstream) { BiConsumer<A, ? super T> downstreamAccumulator = downstream.accumulator(); BiConsumer<Partition<A>, T> accumulator = (result, t) -> downstreamAccumulator.accept(predicate.test(t) ? result.forTrue : result.forFalse, t); BinaryOperator<A> op = downstream.combiner(); BinaryOperator<Partition<A>> merger = (left, right) -> new Partition<>(op.apply(left.forTrue, right.forTrue), op.apply(left.forFalse, right.forFalse)); Supplier<Partition<A>> supplier = () -> new Partition<>(downstream.supplier().get(), downstream.supplier().get()); if (downstream.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)) { return new CollectorImpl<>(supplier, accumulator, merger, CH_ID); } else { Function<Partition<A>, Map<Boolean, D>> finisher = par -> new Partition<>(downstream.finisher().apply(par.forTrue), downstream.finisher().apply(par.forFalse)); return new CollectorImpl<>(supplier, accumulator, merger, finisher, CH_NOID); } }