本文会研究一下Rxjava流量控制类操作符

定义Source

为了方便测试,首先定义一个能够定义发送个数和发送间隔的Source:

private Observable<Integer> getSource(final int num, final long interval) {
        return Observable.unsafeCreate(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                for (int i = 0; i < num; i++) {
                    if (!subscriber.isUnsubscribed()) {
                        subscriber.onNext(i);
                    }
                    try {
                        Thread.sleep(interval);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }

                }
                subscriber.onCompleted();
            }
        });
    }

throttleWithTimeout

当一个事件发射时,rxjava会开始计时,当后一个事件发射时,而这两个事件的时间间隔小于定义的时间间隔,就会丢弃掉前一个事件

所以特别的,我们定义的发射源为200个,间隔10ms,而我们定义的timeout为200ms,所以我们只会收到最后一个事件:

public void throttleWithTimeout() {
        getSource(200, 10)
                .throttleWithTimeout(200, TimeUnit.MILLISECONDS)
                .subscribeOn(Schedulers.computation())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.d("RxjavaTest", "throttleWithTimeout: " + integer);
                    }
                });
    }

log:

06-11 10:35:48.900 8004-8004/com.lhl.demoapplication D/RxjavaTest: throttleWithTimeout: 199

debounce

debounce同throttleWithTimeout一致,当一个事件发射时,rxjava会开始计时,当后一个事件发射时,而这两个事件的时间间隔小于定义的时间间隔,就会丢弃掉前一个事件:

public void debounce() {
        getSource(200, 10)
                .debounce(200, TimeUnit.MILLISECONDS)
                .subscribeOn(Schedulers.computation())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.d("RxjavaTest", "debounce: " + integer);
                    }
                });
    }

Log:

06-11 10:49:05.924 8268-8268/com.lhl.demoapplication D/RxjavaTest: debounce: 199

distinct

distinct的作用类似java 的set,就是去重

public void distinct() {
        Observable.just(1, 2, 3, 4, 5, 4, 3, 2, 1)
                .distinct()
                .subscribeOn(Schedulers.computation())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.d("RxjavaTest", "distinct: " + integer);
                    }
                });
    }

Log:

06-12 02:11:16.297 9340-9340/? D/RxjavaTest: distinct: 1
    distinct: 2
    distinct: 3
    distinct: 4
    distinct: 5

distinctUntilChanged

distinctUntilChanged只会对相邻的重复去重:

    public void distinctUntilChanged() {
        Observable.just(1, 2, 3, 3, 3, 1, 2, 3, 3)
                .distinctUntilChanged()
                .subscribeOn(Schedulers.computation())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.d("RxjavaTest", "distinct: " + integer);
                    }
                });
    }

输出:

06-12 02:15:57.292 9553-9553/com.lhl.demoapplication D/RxjavaTest: distinct: 1
06-12 02:15:57.293 9553-9553/com.lhl.demoapplication D/RxjavaTest: distinct: 2
    distinct: 3
    distinct: 1
    distinct: 2
    distinct: 3

elementAt

elementAt返回指定位置的元素:

    public void elementAt() {
        Observable.just(1, 2, 3, 4, 5, 4, 3, 2, 1)
                .elementAt(2)
                .subscribeOn(Schedulers.computation())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.d("RxjavaTest", "elementAt: " + integer);
                    }
                });
    }

输出:

06-12 02:20:22.440 9682-9682/? D/RxjavaTest: elementAt: 3

filter

filter很常用,用于返回满足条件的元素,比如返回所有偶数:

    public void filter() {
        getSource(10, 10)
                .filter(new Func1<Integer, Boolean>() {
                    @Override
                    public Boolean call(Integer integer) {
                        return integer % 2 == 0;
                    }
                })
                .subscribeOn(Schedulers.computation())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.d("RxjavaTest", "filter: " + integer);
                    }
                });
    }

输出:

06-12 02:23:44.048 9900-9900/? D/RxjavaTest: filter: 0
    filter: 2
06-12 02:23:44.187 9900-9900/? D/RxjavaTest: filter: 4
    filter: 6
    filter: 8

first和last

first和last用于返回第一个或最后一个元素,或者是第一个或最后一个满足条件的元素,比较简单:

    public void first() {
        getSource(10, 10)
                .first()
                .subscribeOn(Schedulers.computation())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.d("RxjavaTest", "first: " + integer);
                    }
                });
    }
    public void last() {
        getSource(10, 10)
                .last()
                .subscribeOn(Schedulers.computation())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.d("RxjavaTest", "last: " + integer);
                    }
                })
    }

skip和take

skip对数据源跳过n项,take对数据源只取n项,另外还有skipLast和takeLast,从最后开始计算:

    public void skip() {
        getSource(10, 10)
                .skip(3)
                .subscribeOn(Schedulers.computation())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.d("RxjavaTest", "skip: " + integer);
                    }
                });
    }
    public void take() {
        getSource(10, 10)
                .take(3)
                .subscribeOn(Schedulers.computation())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.d("RxjavaTest", "take: " + integer);
                    }
                });
    }

sample、throttleFirst和throttleLast

sample、throttleFirst和throttleLast,这三个操作符都是采样,给定一个窗口期,在一个窗口期最多放行一个事件,有所区别的地方在于:sample和throttleLast返回窗口期的最后一个事件,throttleFirst返回窗口期的第一个事件

sample:

    public void sample() {
        getSource(100, 10)
                .sample(300, TimeUnit.MILLISECONDS)
                .subscribeOn(Schedulers.computation())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.d("RxjavaTest", "sample: " + integer);
                    }
                });
    }

输出:

06-12 03:01:19.621 11179-11179/? D/RxjavaTest: sample: 27
06-12 03:01:19.919 11179-11179/? D/RxjavaTest: sample: 51
06-12 03:01:20.218 11179-11179/com.lhl.demoapplication D/RxjavaTest: sample: 77
06-12 03:01:20.477 11179-11179/com.lhl.demoapplication D/RxjavaTest: sample: 99

throttleFirst:

    public void throttleFirst() {
        getSource(100, 10)
                .throttleFirst(300, TimeUnit.MILLISECONDS)
                .subscribeOn(Schedulers.computation())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.d("RxjavaTest", "throttleFirst: " + integer);
                    }
                });
    }

输出:

06-12 03:02:28.899 11286-11286/? D/RxjavaTest: throttleFirst: 0
06-12 03:02:29.175 11286-11286/? D/RxjavaTest: throttleFirst: 27
06-12 03:02:29.477 11286-11286/? D/RxjavaTest: throttleFirst: 53
06-12 03:02:29.782 11286-11286/com.lhl.demoapplication D/RxjavaTest: throttleFirst: 80

throttleLast:

    public void throttleLast() {
        getSource(100, 10)
                .throttleLast(300, TimeUnit.MILLISECONDS)
                .subscribeOn(Schedulers.computation())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.d("RxjavaTest", "throttleLast: " + integer);
                    }
                });
    }

输出:

06-12 03:03:23.762 11391-11391/com.lhl.demoapplication D/RxjavaTest: throttleLast: 22
06-12 03:03:23.883 11391-11391/com.lhl.demoapplication D/RxjavaTest: throttleLast: 48
06-12 03:03:24.184 11391-11391/com.lhl.demoapplication D/RxjavaTest: throttleLast: 74
06-12 03:03:24.470 11391-11391/com.lhl.demoapplication D/RxjavaTest: throttleLast: 99