《深入浅出 RxJS》笔记(一)

创建类操作符

创建类操作符,也是用于创建 Observable 对象的方法;但与其它类型的操作符不一样的是,创建类操作符不依赖于其他 Observable 对象,可以凭空或根据其他数据源创造 Observable 对象。

创建类操作符,有同步和异步之分:

创建同步数据流

对于同步数据流,或同步 Observable 对象,数据之间没有时间间隔。

create

1
2
3
Observable.create = (subscribe) => {
return new Observable(subscribe);
}

create 操作符其实与 Observable 构造函数并没太大区别,一般情况也用不上它。

of

1
2
3
import 'rxjs/add/observable/of';

import { of } from 'rxjs/Observable/of';

of 操作符用于创建指定数据集合的 Observable 对象, 产生的是 Cold Observable。

V4 中,of 操作符叫 just

range

1
2
3
import { range } from 'rxjs/Observable/range';

import 'rxjs/add/observable/range';

range 操作符用于产生指定范围内的连续数字序列(每次递增 1)。

1
Observable.range(start, length)
  • start:开始数字
  • length:数字数列的长度

generate

1
2
3
import { generate } from 'rxjs/Observable/generate';

import 'rxjs/add/observable/generate';

generate 操作符类似于 for 循环,设定一个初始值,每次对初始值进行操作,直到满足指定条件时才结束,同时也可以对当前值进行一系列操作。

1
2
3
4
5
6
7
8
9
10
for(let i = 0; i < 10; i++) {
console.log(i*i);
}

const source$ = generate(
0, // 初始化,相当于 for 循环的 i = 0
i => i < 10, // 结束判断,相当于 for 循环的 i < 10
i => i++, // 相当于 for 循环的 i++
i => i * i // 相当于 for 循环内的执行代码
)
1
2
3
4
5
6
const source$ = generate(
'',
str => str.length < 10,
str => `${str}-`,
str => str
)

generate 函数包含四个参数,除第一个参数是具体的值外,其余三个参数都是纯函数。

repeat

1
2
3
import { repeat } from 'rxjs/operators/repeat';

import 'rxjs/add/operator/repeat';

repeat 操作符与其它创建类操作符(create、of、range 等)不一样,repeat 操作符是实例操作符,并非静态操作符。它的功能是重复其它 Observable 对象数据。

1
repean(count);

不传 count 或 count 为负数时表示无限重复

repeat 操作符具体的过程是:首先 subscribe 需要重复的 Observable 对象,然后将数据传输下一个管道;当需要重复多次时,会在每次将待重复的数据传输完后取消订阅,然后再次订阅该 Observable 对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
const { Observable } = require('rxjs');

const source$ = Observable.create(observer => {
console.info('subscribe');
setTimeout(() => observer.next(1), 1000);
setTimeout(() => observer.next(2), 2000);
setTimeout(() => observer.next(3), 3000);
setTimeout(() => observer.complete(), 4000);

return {
unsubscribe: () => {
console.info('unsubscribe');
}
};
});

const repeat$ = source$.repeat(2);

repeat$.subscribe(
console.log,
null,
_ => console.info('complete')
);

Demo

V4 中 repeat 是静态操作符并只能重复一个元素

1
2
> Rx.Observable.repeat(item, count);
>

>

item 表示需要重复的元素,count 表示重复次数

empty

empty 操作符创建一个直接 complete 的 Observable 对象,不产生任何数据。

1
const source$ = Observable.empty();

mark

throw

1
2
3
import 'rxjs/add/observable/throw';

import { _throw } from 'rxjs/observable/throw';

throw 操作符创建一个直接出错的 Observable 对象,抛出的错误就是 throw 的参数。

1
_throw(new Error('error'));

mark

never

never 操作符创建一个三不(不输出数据、不完成、不报错) Observable 对象。

mark

创建异步数据流

异步数据流,也就是异步 Observable 对象,需要考虑数据之间的间隔问题。

interval

interval 操作符与 JS 中 setInterval 类似,用于创建一个能按指定的时间间隔输出递增的整数序列(从 0 开始)的异步 Observable 对象,定时产生一个数据。

1
Observable.interval(time);

mark

timer

timer 操作符与 JS 中 setTimeout 类似,用于创建一个到达指定时间间隔后输出一个数据(0)的 Observable 对象。

1
Observable.timer(time)

mark

time 也可以为 Date 对象,表示到达指定时间后创建 Observable 对象。

mark

另外,timer 操作符还支持第二个参数,指定第二个参数后就能产生一个可以持续输出数据(递增整数列)的 Observable 对象,类似于 interval 操作符。第二个参数指定的是持续输出数据之间的时间间隔。

mark

如果第一个参数和第二个参数一样,那么就和 interval 操作符功能一样了。

1
2
3
Observable.timer(time, time)

Observable.interval(time)

from

from 操作符用于将可遍历数据结构(如:数组、类数组、generator、promise 等)转化为 Observable 对象,转化后的 Observable 对象为同步对象。

mark

from 操作符还支持转化 Observable 对象,但意义不大。

mark

fromPromise

fromPromise 操作符其实与 from(promise) 效果一样,Promise 状态为 fullfilled 时,创建的 Observable 对象会输出对应的结果,并立刻完成;如果 Promise 状态为 rejected,那么 Observable 对象也会触发相应的 error 事件。

markmark

V4 中 fromPromise 不仅支持 Promise 对象作为参数,还支持返回 Promise 对象的函数作为参数

fromEvent

fromEvent 操作符通常用于将 DOM 事件转换为 Observable 对象数据。

1
Observable.fromEvent(element, eventName)
1
2
3
4
5
const event$ = Observable.fromEvent(document.getElementsByTagName('button'), 'click');

event$.subscribe((event)=> {
console.info(event.target);
});
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// node EventEmitter
const emitter = new EventEmitter();
const event$ = Observable.fromEvent(emitter, 'msg');

event$.subscribe(
console.log,
error => console.log('catch', error),
() => console.log('complete')
);

emitter.emit('msg', 1); // 1
emitter.emit('msg', 2); // 2
emitter.emit('another-msg', 'Oops'); // 不输出
emitter.emit('msg', 3); // 3

fromEventPattern

fromEvent 操作符只能将 DOM 事件或 Node 中的 EventEmitter 转换为 Observable 对象,但事件源并不只有这两种,因此需要跟灵活些的操作符 fromEventPattern。

fromEventPattern 操作符接收两个函数参数,分别对应创建后的 Observable 对象被订阅(subscribe)和退订(unsunscribe)时的事件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
const emitter = new EventEmitter();

const subscribeEvent = (cb) => emitter.addListener('msg', cb);
const unsunscribeEvent = (cb) => emitter.removeListener('msg', cb);

const event$ = Observable.fromEventPattern(subscribeEvent, unsunscribeEvent);
const subscription$ = event$.subscribe(
console.log,
error => console.log('catch', error),
() => console.log('complete')
);

emitter.emit('msg', 1); // 1
emitter.emit('msg', 2); // 2

subscription.unsubscribe();
emitter.emit('msg', 3); // 不输出

ajax

ajax 操作符用于根据 Ajax 请求结果创建 Observable 对象。

1
Observable.ajax(url, options);
1
2
3
4
Observable.ajax('https://api.github.com/repos/ReactiveX/rxjs', { responseType: 'json' })
.subscribe(res => {
const starCount = res.response.stargazers_count;
});

repeatWhen

repeat 操作符可以重复订阅 Observable,却无法控制订阅的时间,但 repeatWhen 操作符可以满足。

1
repeatWhen(notifier: function(notifications: Observable): Observable): Observable

repeatWhen 操作符接收一个函数作为参数,这个函数返回一个作为控制器的 Observable(其实是 Subject)。若待重复的 Observable 处于 complete 状态,会通知控制器 Observable 且 notifications 输出一个数据,然后退订并重新订阅待重复的 Observable,如此循环往复;只有当控制器 Observable 处于 complete 或 error 状态时,才会停止。

1
2
3
4
5
6
const notifier = (notification$) => {
notification$.subscribe(x => console.log(x));
return Rx.Observable.timer(1000);
};

Observable.of(1, 2, 3).repeatWhen(notifier);

mark

1
2
3
const notifier = (notification$) => notification$.delay(2000);

Observable.of(1, 2, 3).repeatWhen(notifier);

mark

defer

通过创建操作符创建的源 Observable 需要占用资源,有时候还需要外部的资源(如:fromEvent、ajax)。为了避免资源浪费,需要在创建时不分配资源,只有在订阅时才分配,这就是 defer 操作符所做的事。

defer 操作符会创建一个不分配资源的代理 Observable,在被订阅时创建真正占用资源的 Observable,然后代理 Observable 把所有工作转交给真正占用资源的 Observable。

1
static defer(observableFactory: function(): SubscribableOrPromise): Observable

defer() 接受一个函数参数,当 defer 产生的代理 Observable 被订阅时就调用函数参数。这个函数参数返回真正占用资源的 Observable 对象,也支持返回 Promise 对象,当返回的是 Promise 时,不需要通过 fromPromise 转化。

1
2
3
4
5
6
7
8
9
10
11
const observableFactory = () => {
if (Math.random() > 0.5) {
return Observable.fromEvent(document, 'click');
} else {
return Observable.ajax(url);
}
}

const source$ = Observable.defer(observableFactory);

source$.subscribe(x => console.log(x));