RxJava——3.变换操作符

mac2024-10-26  53

Map()

作用 对被观察者发送的每1个事件都通过指定的函数处理,从而变换成另外一种事件应用场景 数据类型转换 // 采用RxJava基于事件流的链式操作 Observable.create(new ObservableOnSubscribe<Integer>() { // 1. 被观察者发送事件 = 参数为整型 = 1、2、3 @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); } // 2. 使用Map变换操作符中的Function函数对被观察者发送的事件进行统一变换:整型变换成字符串类型 }).map(new Function<Integer, String>() { @Override public String apply(Integer integer) throws Exception { return "使用 Map变换操作符 将事件" + integer + "的参数从 整型" + integer + " 变换成 字符串类型" + integer; } }).subscribe(new Consumer<String>() { // 3. 观察者接收事件时,是接收到变换后的事件 = 字符串类型 @Override public void accept(String s) throws Exception { Log.d(TAG, s); } });

FlatMap()

作用:将被观察者发送的事件序列进行拆分 & 单独转换,再合并成一个新的事件序列,最后再进行发送原理 为事件序列中每个事件都创建一个 Observable 对象;将对每个原始事件转换后的新事件都放入到对应 Observable对象;将新建的每个Observable 都合并到一个 新建的、总的Observable 对象;新建的、总的Observable 对象 将 新合并的事件序列 发送给观察者(Observer) 应用场景 无序的将被观察者发送的整个事件序列进行变换 // 采用RxJava基于事件流的链式操作 Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); } // 采用flatMap()变换操作符 }).flatMap(new Function<Integer, ObservableSource<String>>() { @Override public ObservableSource<String> apply(Integer integer) throws Exception { final List<String> list = new ArrayList<>(); for (int i = 0; i < 3; i++) { list.add("我是事件 " + integer + "拆分后的子事件" + i); // 通过flatMap中将被观察者生产的事件序列先进行拆分,再将每个事件转换为一个新的发送三个String事件 // 最终合并,再发送给被观察者 } return Observable.fromIterable(list); } }).subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.d(TAG, s); } });

ConcatMap()

作用:类似FlatMap()操作符与FlatMap()的区别在于:拆分 & 重新合并生成的事件序列 的顺序 = 被观察者旧序列生产的顺序

Buffer()

作用 定期从被观察者(Obervable)需要发送的事件中获取一定数量的事件 & 放到缓存区中,最终发送应用场景 缓存被观察者发送的事件 // 被观察者 需要发送5个数字 Observable.just(1, 2, 3, 4, 5) .buffer(3, 1) // 设置缓存区大小 & 步长 // 缓存区大小 = 每次从被观察者中获取的事件数量 // 步长 = 每次获取新事件的数量 .subscribe(new Observer<List<Integer>>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(List<Integer> stringList) { // Log.d(TAG, " 缓存区里的事件数量 = " + stringList.size()); for (Integer value : stringList) { Log.d(TAG, " 事件 = " + value); } } @Override public void onError(Throwable e) { Log.d(TAG, "对Error事件作出响应" ); } @Override public void onComplete() { Log.d(TAG, "对Complete事件作出响应"); } });
最新回复(0)