本文会研究一下Rxjava流量控制类操作符
定义Source
为了方便测试,首先定义一个能够定义发送个数和发送间隔的Source:
private Observable<Integer> getSource(final int num, final long interval) {
return Observable.unsafeCreate(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for (int i = 0; i < num; i++) {
if (!subscriber.isUnsubscribed()) {
subscriber.onNext(i);
}
try {
Thread.sleep(interval);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
subscriber.onCompleted();
}
});
}
throttleWithTimeout
当一个事件发射时,rxjava会开始计时,当后一个事件发射时,而这两个事件的时间间隔小于定义的时间间隔,就会丢弃掉前一个事件
所以特别的,我们定义的发射源为200个,间隔10ms,而我们定义的timeout为200ms,所以我们只会收到最后一个事件:
public void throttleWithTimeout() {
getSource(200, 10)
.throttleWithTimeout(200, TimeUnit.MILLISECONDS)
.subscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d("RxjavaTest", "throttleWithTimeout: " + integer);
}
});
}
log:
06-11 10:35:48.900 8004-8004/com.lhl.demoapplication D/RxjavaTest: throttleWithTimeout: 199
debounce
debounce同throttleWithTimeout一致,当一个事件发射时,rxjava会开始计时,当后一个事件发射时,而这两个事件的时间间隔小于定义的时间间隔,就会丢弃掉前一个事件:
public void debounce() {
getSource(200, 10)
.debounce(200, TimeUnit.MILLISECONDS)
.subscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d("RxjavaTest", "debounce: " + integer);
}
});
}
Log:
06-11 10:49:05.924 8268-8268/com.lhl.demoapplication D/RxjavaTest: debounce: 199
distinct
distinct的作用类似java 的set,就是去重
public void distinct() {
Observable.just(1, 2, 3, 4, 5, 4, 3, 2, 1)
.distinct()
.subscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d("RxjavaTest", "distinct: " + integer);
}
});
}
Log:
06-12 02:11:16.297 9340-9340/? D/RxjavaTest: distinct: 1
distinct: 2
distinct: 3
distinct: 4
distinct: 5
distinctUntilChanged
distinctUntilChanged只会对相邻的重复去重:
public void distinctUntilChanged() {
Observable.just(1, 2, 3, 3, 3, 1, 2, 3, 3)
.distinctUntilChanged()
.subscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d("RxjavaTest", "distinct: " + integer);
}
});
}
输出:
06-12 02:15:57.292 9553-9553/com.lhl.demoapplication D/RxjavaTest: distinct: 1
06-12 02:15:57.293 9553-9553/com.lhl.demoapplication D/RxjavaTest: distinct: 2
distinct: 3
distinct: 1
distinct: 2
distinct: 3
elementAt
elementAt返回指定位置的元素:
public void elementAt() {
Observable.just(1, 2, 3, 4, 5, 4, 3, 2, 1)
.elementAt(2)
.subscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d("RxjavaTest", "elementAt: " + integer);
}
});
}
输出:
06-12 02:20:22.440 9682-9682/? D/RxjavaTest: elementAt: 3
filter
filter很常用,用于返回满足条件的元素,比如返回所有偶数:
public void filter() {
getSource(10, 10)
.filter(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer % 2 == 0;
}
})
.subscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d("RxjavaTest", "filter: " + integer);
}
});
}
输出:
06-12 02:23:44.048 9900-9900/? D/RxjavaTest: filter: 0
filter: 2
06-12 02:23:44.187 9900-9900/? D/RxjavaTest: filter: 4
filter: 6
filter: 8
first和last
first和last用于返回第一个或最后一个元素,或者是第一个或最后一个满足条件的元素,比较简单:
public void first() {
getSource(10, 10)
.first()
.subscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d("RxjavaTest", "first: " + integer);
}
});
}
public void last() {
getSource(10, 10)
.last()
.subscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d("RxjavaTest", "last: " + integer);
}
})
}
skip和take
skip对数据源跳过n项,take对数据源只取n项,另外还有skipLast和takeLast,从最后开始计算:
public void skip() {
getSource(10, 10)
.skip(3)
.subscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d("RxjavaTest", "skip: " + integer);
}
});
}
public void take() {
getSource(10, 10)
.take(3)
.subscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d("RxjavaTest", "take: " + integer);
}
});
}
sample、throttleFirst和throttleLast
sample、throttleFirst和throttleLast,这三个操作符都是采样,给定一个窗口期,在一个窗口期最多放行一个事件,有所区别的地方在于:sample和throttleLast返回窗口期的最后一个事件,throttleFirst返回窗口期的第一个事件
sample:
public void sample() {
getSource(100, 10)
.sample(300, TimeUnit.MILLISECONDS)
.subscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d("RxjavaTest", "sample: " + integer);
}
});
}
输出:
06-12 03:01:19.621 11179-11179/? D/RxjavaTest: sample: 27
06-12 03:01:19.919 11179-11179/? D/RxjavaTest: sample: 51
06-12 03:01:20.218 11179-11179/com.lhl.demoapplication D/RxjavaTest: sample: 77
06-12 03:01:20.477 11179-11179/com.lhl.demoapplication D/RxjavaTest: sample: 99
throttleFirst:
public void throttleFirst() {
getSource(100, 10)
.throttleFirst(300, TimeUnit.MILLISECONDS)
.subscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d("RxjavaTest", "throttleFirst: " + integer);
}
});
}
输出:
06-12 03:02:28.899 11286-11286/? D/RxjavaTest: throttleFirst: 0
06-12 03:02:29.175 11286-11286/? D/RxjavaTest: throttleFirst: 27
06-12 03:02:29.477 11286-11286/? D/RxjavaTest: throttleFirst: 53
06-12 03:02:29.782 11286-11286/com.lhl.demoapplication D/RxjavaTest: throttleFirst: 80
throttleLast:
public void throttleLast() {
getSource(100, 10)
.throttleLast(300, TimeUnit.MILLISECONDS)
.subscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d("RxjavaTest", "throttleLast: " + integer);
}
});
}
输出:
06-12 03:03:23.762 11391-11391/com.lhl.demoapplication D/RxjavaTest: throttleLast: 22
06-12 03:03:23.883 11391-11391/com.lhl.demoapplication D/RxjavaTest: throttleLast: 48
06-12 03:03:24.184 11391-11391/com.lhl.demoapplication D/RxjavaTest: throttleLast: 74
06-12 03:03:24.470 11391-11391/com.lhl.demoapplication D/RxjavaTest: throttleLast: 99