RxJava学习笔记

背景知识

在很多软件编程任务中,或多或少你都会期望你写的代码能按照编写的顺序,一次一个的顺序执行和完成。但是在ReactiveX中,很多指令可能是并行执行的,之后他们的执行结果才会被观察者捕获,顺序是不确定的。为达到这个目的,你定义一种获取和变换数据的机制,而不是调用一个方法。在这种机制下,存在一个可观察对象(Observable),观察者(Observer)订阅(Subscribe)它,当数据就绪时,之前定义的机制就会分发数据给一直处于等待状态的观察者哨兵。

这种方法的优点是,如果你有大量的任务要处理,它们互相之间没有依赖关系。你可以同时开始执行它们,不用等待一个完成再开始下一个(用这种方式,你的整个任务队列能耗费的最长时间,不会超过任务里最耗时的那个)。

有很多术语可用于描述这种异步编程和设计模式,在在本文里我们使用这些术语:一个观察者订阅一个可观察对象 (An observer subscribes to an Observable)。通过调用观察者的方法,Observable发射数据或通知给它的观察者。

在其它的文档和场景里,有时我们也将Observer叫做Subscriber、Watcher、Reactor。这个模型通常被称作Reactor模式。

快速上手

普通的方法调用(不是某种异步方法,也不是Rx中的并行调用),流程通常是这样的:

  • 调用某一个方法
  • 用一个变量保存方法返回的结果
  • 使用这个变量和它的新值做些有用的事
    代码描述大概是:
    1
    Object returnVal = someMethod(itsParameters);

在异步模型中流程更像这样的:

  • 定义一个方法,它完成某些任务,然后从异步调用中返回一个值,这个方法是观察者的一部分
  • 将这个异步调用本身定义为一个Observable
  • 观察者通过订阅(Subscribe)操作关联到那个Observable
  • 继续你的业务逻辑,等方法返回时,Observable会发射结果,观察者的方法会开始处理结果或结果集

下面是来自官方的Demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
Integer[] items = { 0, 1, 2, 3, 4, 5 };
Observable myObservable = Observable.from(items);

myObservable.subscribe(
new Action1<Integer>() {
@Override
public void call(Integer item) {
System.out.println(item);
}
},
new Action1<Throwable>() {
@Override
public void call(Throwable error) {
System.out.println("Error encountered: " + error.getMessage());
}
},
new Action0() {
@Override
public void call() {
System.out.println("Sequence complete");
}
}
);

输出

1
0
1
2
3
4
5
Sequence complete

取消订阅

在一些ReactiveX实现中,有一个特殊的观察者接口Subscriber,它有一个unsubscribe方法。调用这个方法表示你不关心当前订阅的Observable了,因此Observable可以选择停止发射新的数据项(如果没有其它观察者订阅)。

取消订阅的结果会传递给这个Observable的操作符链,而且会导致这个链条上的每个环节都停止发射数据项。这些并不保证会立即发生,然而,对一个Observable来说,即使没有观察者了,它也可以在一个while循环中继续生成并尝试发射数据项。

几个对象关系整理

Observable(观察者) 和 Subscriber(订阅者)是两个主要的类。在 RxJava 上,一个 Observable 是一个发出数据流或者事件的类,Subscriber 是一个对这些发出的 items (数据流或者事件)进行处理(采取行动)的类。一个 Observable 的标准流发出一个或多个 item,然后成功完成或者出错。一个 Observable 可以有多个 Subscribers,并且通过 Observable 发出的每一个 item,该 item 将会被发送到 Subscriber.onNext() 方法来进行处理。一旦 Observable 不再发出 items,它将会调用 Subscriber.onCompleted() 方法,或如果有一个出错的话 Observable 会调用 Subscriber.onError() 方法。

  • Observable 观察者

  • Observer 订阅者

    1
    2
    3
    4
    5
    public interface Observer<T> {
    void onCompleted();
    void onError(Throwable e);
    void onNext(T t);
    }
  • Subscriber 订阅者

    1
    2
    3
    4
    5
    6
    7
    8
    public abstract class Subscriber<T> implements Observer<T>, Subscription {}

    - Subscription
    ```java
    public interface Subscription {
    void unsubscribe();
    boolean isUnsubscribed();
    }
1
- Subject
这个类比较奇葩,既是观察者(`extends Observable<R> `),又是订阅者(`implements Observer<T>`)
```java
public abstract class Subject<T, R> extends Observable<R> implements Observer<T> {}
  • Scheduler 线程调度

操作符组合

用操作符组合Observable

对于ReactiveX来说,Observable和Observer仅仅是个开始,它们本身不过是标准观察者模式的一些轻量级扩展,目的是为了更好的处理事件序列。

ReactiveX真正强大的地方在于它的操作符,操作符让你可以变换、组合、操纵和处理Observable发射的数据。

Rx的操作符让你可以用声明式的风格组合异步操作序列,它拥有回调的所有效率优势,同时又避免了典型的异步系统中嵌套回调的缺点。

下面是常用的操作符列表:

  • 创建操作 Create, Defer, Empty/Never/Throw, From, Interval, Just, Range, Repeat, Start, Timer
  • 变换操作 Buffer, FlatMap, GroupBy, Map, Scan和Window
  • 过滤操作 Debounce, Distinct, ElementAt, Filter, First, IgnoreElements, Last, Sample, Skip, SkipLast, Take, TakeLast
  • 组合操作 And/Then/When, CombineLatest, Join, Merge, StartWith, Switch, Zip
  • 错误处理 Catch和Retry
  • 辅助操作 Delay, Do, Materialize/Dematerialize, ObserveOn, Serialize, Subscribe, SubscribeOn, TimeInterval, Timeout, Timestamp, Using
  • 条件和布尔操作 All, Amb, Contains, DefaultIfEmpty, SequenceEqual, SkipUntil, SkipWhile, TakeUntil, TakeWhile
  • 算术和集合操作 Average, Concat, Count, Max, Min, Reduce, Sum
  • 转换操作 To
  • 连接操作 Connect, Publish, RefCount, Replay
  • 反压操作,用于增加特殊的流程控制策略的操作符
    这些操作符并不全都是ReactiveX的核心组成部分,有一些是语言特定的实现或可选的模块。

操作符详细使用

创建型

from

将其它种类的对象和数据类型转换为Observable

1
2
Integer[] items = { 0, 1, 2, 3, 4, 5 };
Observable myObservable = Observable.from(items);

  • from可以接收的类型有:
    • from(array))
    • from(Iterable))
    • from(Future))
    • from(Future,Scheduler))
    • from(Future,timeout, timeUnit))

Interval

创建一个按固定时间间隔发射整数序列的Observable

示例:每隔100ms输出现在数字

1
Observable.interval(100, TimeUnit.MILLISECONDS).subscribe(System.out::println);

  • 用法
    • interval(long,TimeUnit))
    • interval(long,TimeUnit,Scheduler))

just

Just将单个数据转换为发射那个数据的Observable。

Just类似于From,但是From会将数组或Iterable的素具取出然后逐个发射,而Just只是简单的原样发射,将数组或Iterable当做单个数据。

1
Observable.just(1, "text", 2).subscribe(System.out::println);

通过查看源码发现just最多支持传入10种数据类型不同的对象

1
2
3
public final static <T> Observable<T> just(T t1, T t2, T t3, T t4, T t5, T t6, T t7, T t8, T t9, T t10) {
return from(Arrays.asList(t1, t2, t3, t4, t5, t6, t7, t8, t9, t10));
}

range

创建一个发射特定整数序列的Observable

从0开始,递增产生100个整数(0-99)

1
Observable.range(0, 100).subscribe(System.out::println);

源码:

1
public final static Observable<Integer> range(int start, int count) {...}

Repeat

创建一个发射特定数据重复多次的Observable

1
Observable.range(2,3).repeat(2).subscribe(System.out::println);

输出:

1
2
3
4
2
3
4

Observable.range(2,3)产生2,3,4.然后repeat重复2次

timber

1
Observable.timer(100,TimeUnit.MILLISECONDS).takeLast(2).subscribe(System.out::println);

RxJava将这个操作符实现为timer函数。

timer返回一个Observable,它在延迟一段给定的时间后发射一个简单的数字0。

timer操作符默认在computation调度器上执行。有一个变体可以通过可选参数指定Scheduler。

变换操作

Buffer

定期收集Observable的数据放进一个数据包裹,然后发射这些数据包裹,而不是一次发射一个值。

  • buffer(count)
    1
    Observable.range(0, 10).buffer(3).subscribe(System.out::println);

输出:

1
[0, 1, 2]
[3, 4, 5]
[6, 7, 8]
[9]

  • buffer(count, skip)
    1
    Observable.range(0, 20).buffer(2, 5).subscribe(System.out::println);

输出:

1
[0, 1]
[5, 6]
[10, 11]
[15, 16]

分析:每次输出2个,下次输出跳过5个

FlatMap

FlatMap将一个发射数据的Observable变换为多个Observables,然后将它们发射的数据合并后放进一个单独的Observable

1
2
3
4
5
6
Observable.range(0, 5).flatMap(new Func1<Integer, Observable<?>>() {
@Override
public Observable<?> call(Integer integer) {
return Observable.just("observable " + integer);//每个源数据再封装成Observable发射
}
}).subscribe(System.out::println);

GroupBy

将一个Observable分拆为一些Observables集合,它们中的每一个发射原始Observable的一个子序列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Observable.range(1,10).groupBy(new Func1<Integer, Integer>() {
@Override
public Integer call(Integer integer) {
return integer%3;
}
}).subscribe(new Action1<GroupedObservable<Integer, Integer>>() {
@Override
public void call(GroupedObservable<Integer, Integer> result) {
result.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println(integer + "%3==" + result.getKey());
}
});
}
});

分析:将1-10按3的余数分组,GroupedObservable第一个变量为分组的key,第二个变量为当前的值

Map

对Observable发射的每一项数据应用一个函数,执行变换操作

1
2
3
4
5
6
Observable.range(0, 5).map(new Func1<Integer, String>() {
@Override
public String call(Integer integer) {
return "number " + integer;//此处将Integer变换成String
}
}).subscribe(System.out::println);

Scan

连续地对数据序列的每一项应用一个函数,然后连续发射结果

稍微有点复杂,上图吧

1
2
3
4
5
6
Observable.range(1, 10).scan(new Func2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) {
return integer + integer2;
}
}).subscribe(System.out::println);

输出:

1
1
3
6
10
15
21
28
36
45
55

分析:第一个数单独发射;第二次发送前两个数的运算后结果;第三为第二次运算的结果和第三个数做运算;后面类似

如果只想取最后的结果,可以用过滤操作中的takeLast操作符

Window

定期将来自原始Observable的数据分解为一个Observable窗口,发射这些窗口,而不是每次发射一项数据

上图

1
public final Observable<Observable<T>> window(int count, int skip) {}
  • tip
    这个是和时间有关的,发送一定时间内的数据。加上skip操作符可以进行类似取样的操作

过滤操作

Debounce

仅在过了一段指定的时间还没发射数据时才发射一个数据

1
public final Observable<T> debounce(long timeout, TimeUnit unit) {...}
  • tip
    Debounce操作符会过滤掉发射速率过快的数据项。

适用的场景可以是在线搜索功能,间隔指定时间发送当前输入框的搜索内容

Distinct

抑制(过滤掉)重复的数据项

Distinct的过滤规则是:只允许还没有发射过的数据项通过。

  • distinctUntilChanged(Func1)
    这个方法比较有意思,和distinct(Func1)一样,根据一个函数产生的Key判定两个相邻的数据项是不是不同的。
1
Observable.just(1, 2, 2,2, 3, 2,  5, 4, 4, 5).distinctUntilChanged().subscribe(System.out::println);

输出:

1
1
2
3
2
5
4
5

通过结果我们可以看到,只有相邻的数重复时,才会被过滤掉

ElementAt

只发射第N项数据

Filter

只发射通过了谓词测试的数据项

逢七过的简单实现,输出都是危险的数字

1
2
3
4
5
6
Observable.range(1, 100).filter(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer % 7 == 0 || integer % 10 == 7 || integer / 10 == 7;
}
}).subscribe(System.out::println);

First

只发射第一项(或者满足某个条件的第一项)数据

IgnoreElements

如果你不关心一个Observable发射的数据,但是希望在它完成时或遇到错误终止时收到通知,你可以对Observable使用ignoreElements操作符,它会确保永远不会调用观察者的onNext()方法。

Last

只发射最后一项(或者满足某个条件的最后一项)数据

Sample

定期发射Observable最近发射的数据项

Sample操作符定时查看一个Observable,然后发射自上次采样以来它最近发射的数据。

在某些实现中,有一个ThrottleFirst操作符的功能类似,但不是发射采样期间的最近的数据,而是发射在那段时间内的第一项数据

Skip/SkipLast

抑制Observable发射的前/后N项数据

Take/TakeLast

只发射前/后面的N项数据

组合操作

And/Then/When

使用Pattern和Plan作为中介,将两个或多个Observable发射的数据集合并到一起

And/Then/When操作符组合的行为类似于zip,但是它们使用一个中间数据结构。接受两个或多个Observable,一次一个将它们的发射物合并到Pattern对象,然后操作那个Pattern对象,变换为一个Plan。随后将这些Plan变换为Observable的发射物。

CombineLatest

当两个Observables中的任何一个发射了数据时,使用一个函数结合每个Observable发射的最近数据项,并且基于这个函数的结果发射数据。

CombineLatest操作符行为类似于zip,但是只有当原始的Observable中的每一个都发射了一条数据时zip才发射数据。CombineLatest则在原始的Observable中任意一个发射了数据时发射一条数据。当原始Observables的任何一个发射了一条数据时,CombineLatest使用一个函数结合它们最近发射的数据,然后发射这个函数的返回值。

  • tip
    zip一样,可以用作表单验证。或者某个控件的属性改变有其他的几个控件直接影响的场景

    Join

    任何时候,只要在另一个Observable发射的数据定义的时间窗口内,这个Observable发射了一条数据,就结合两个Observable发射的数据。
1
2
3
4
5
public final <TRight, TLeftDuration, TRightDuration, R> Observable<R> join(
Observable<TRight> right,
Func1<T, Observable<TLeftDuration>> leftDurationSelector,
Func1<TRight, Observable<TRightDuration>> rightDurationSelector,
Func2<T, TRight, R> resultSelector)
{}

Merge

合并多个Observables的发射物

使用Merge操作符你可以将多个Observables的输出合并,就好像它们是一个单个的Observable一样。

Merge可能会让合并的Observables发射的数据交错(有一个类似的操作符Concat不会让数据交错,它会按顺序一个接着一个发射多个Observables的发射物)。

StartWith

在数据序列的开头插入一条指定的项

Switch

将一个发射多个Observables的Observable转换成另一个单独的Observable,后者发射那些Observables最近发射的数据项

Zip

通过一个函数将多个Observables的发射物结合到一起,基于这个函数的结果为每个结合体发射单个数据项。


1
public final static <T1, T2, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, final Func2<? super T1, ? super T2, ? extends R> zipFunction) {}

T1和T2两个Observable结合为C类型的Observable返回

错误处理 (待整理)

Catch和Retry

辅助操作 (待整理)

Delay, Do, Materialize/Dematerialize, ObserveOn, Serialize, Subscribe, SubscribeOn, TimeInterval, Timeout, Timestamp, Using

条件和布尔操作 (待整理)

All, Amb, Contains, DefaultIfEmpty, SequenceEqual, SkipUntil, SkipWhile, TakeUntil, TakeWhile

算术和集合操作 (待整理)

Average, Concat, C
ount, Max, Min, Reduce, Sum

转换操作 (待整理)

To

连接操作 (待整理)

Connect, Publish, RefCount, Replay

反压操作(待整理)

用于增加特殊的流程控制策略的操作符

参考

https://mcxiaoke.gitbooks.io/rxdocs/content/

文章目录
  1. 1. 背景知识
  2. 2. 快速上手
  3. 3. 取消订阅
  4. 4. 几个对象关系整理
  5. 5. 操作符组合
  6. 6. 操作符详细使用
    1. 6.1. 创建型
      1. 6.1.1. from
      2. 6.1.2. Interval
      3. 6.1.3. just
      4. 6.1.4. range
      5. 6.1.5. Repeat
      6. 6.1.6. timber
    2. 6.2. 变换操作
      1. 6.2.1. Buffer
      2. 6.2.2. FlatMap
      3. 6.2.3. GroupBy
      4. 6.2.4. Map
      5. 6.2.5. Scan
      6. 6.2.6. Window
    3. 6.3. 过滤操作
      1. 6.3.1. Debounce
      2. 6.3.2. Distinct
      3. 6.3.3. ElementAt
      4. 6.3.4. Filter
      5. 6.3.5. First
      6. 6.3.6. IgnoreElements
      7. 6.3.7. Last
      8. 6.3.8. Sample
      9. 6.3.9. Skip/SkipLast
      10. 6.3.10. Take/TakeLast
    4. 6.4. 组合操作
      1. 6.4.1. And/Then/When
      2. 6.4.2. CombineLatest
      3. 6.4.3. Join
      4. 6.4.4. Merge
      5. 6.4.5. StartWith
      6. 6.4.6. Switch
      7. 6.4.7. Zip
    5. 6.5. 错误处理 (待整理)
    6. 6.6. 辅助操作 (待整理)
    7. 6.7. 条件和布尔操作 (待整理)
    8. 6.8. 算术和集合操作 (待整理)
    9. 6.9. 转换操作 (待整理)
    10. 6.10. 连接操作 (待整理)
    11. 6.11. 反压操作(待整理)
  7. 7. 参考