Rxjs源码分析(二)--Subscriber类

前面我们分析了Observable类,大致了解了Observable类的执行时间以及如何执行的。
从Observable的Subscribe函数中,调用了函数toSubscriber返回Subscriber对象,我们看看该函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 如何参数提供的是Subscriber对象,则返回该对象,
// 否则将next, error, complete参数传进Subscriber类中去创建新的对象。

export function toSubscriber<T>(
nextOrObserver?: PartialObserver<T> | ((value: T) => void),
error?: (error: any) => void,
complete?: () => void): Subscriber<T> {

if (nextOrObserver) {
if (nextOrObserver instanceof Subscriber) {
return (<Subscriber<T>> nextOrObserver);
}

if (nextOrObserver[rxSubscriberSymbol]) {
return nextOrObserver[rxSubscriberSymbol]();
}
}

if (!nextOrObserver && !error && !complete) {
return new Subscriber(emptyObserver);
}

return new Subscriber(nextOrObserver, error, complete);
}

看看Subscriber的构造函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
constructor(destinationOrNext?: PartialObserver<any> | ((value: T) => void),
error?: (e?: any) => void,
complete?: () => void) {
super(); // 调用父类构造函数,继承至Subscription,从前面的类图可以看出

switch (arguments.length) {
case 0:
// 不提供参数,则将emptyObserver赋值, empty是一个对象包含next,error,complete三个空函数,
// 和一个closed为true的bool变量
this.destination = emptyObserver;
break;

//下面代码的主要功能是: 给destination变量赋值, 如果参数提供的是Object 则判断是否是Subscriber实例,是则赋值,否则将// new SafeSubscriber实例并赋值,所以destination变量也是Subscriber实例。SafeSubscriber类等下我们分析其作用
case 1:
if (!destinationOrNext) {
this.destination = emptyObserver;
break;
}
if (typeof destinationOrNext === 'object') {
if (destinationOrNext instanceof Subscriber) {
this.syncErrorThrowable = destinationOrNext.syncErrorThrowable;
this.destination = destinationOrNext;
destinationOrNext.add(this);
} else {
this.syncErrorThrowable = true;
this.destination = new SafeSubscriber<T>(this, <PartialObserver<any>> destinationOrNext);
}
break;
}
default:
this.syncErrorThrowable = true;
this.destination = new SafeSubscriber<T>(this, <((value: T) => void)> destinationOrNext, error, complete);
break;
}
}

下面看看Subscriber中的next,error,complete方法,同时内部调用了_next, _error, _complete方法。
为何需要这么麻烦,还要再次定义_next,_error,_complete这三个内部使用的方法,应该是为了代码复用和功能内聚吧。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
  // 这边isStopped变量是为了判断,如果当前已经调用了error,或者compete方法后,再次调用next是不起作用的,
// 所以需要该变量来作判断
next(value?: T): void {
if (!this.isStopped) {
this._next(value);
}
}
error(err?: any): void {
if (!this.isStopped) {
this.isStopped = true;
this._error(err);
}
}
complete(): void {
if (!this.isStopped) {
this.isStopped = true;
this._complete();
}
}

// 源码采用的ts写的,protected关键字,作用是该方法仅能被该类和子类访问,具体可以看ts官网
// destination在构造函数中已经赋值,所以下面一般将执行到SafeSubscriber中
protected _next(value: T): void {
this.destination.next(value);
}

protected _error(err: any): void {
this.destination.error(err);
this.unsubscribe();
}

protected _complete(): void {
this.destination.complete();
this.unsubscribe();
}

下面我们看看SafeSubscriber类,该类继承至Subscriber,先来看看构造函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// SafeSubscriber中同时保存赋值_parentSubscriber变量,该变量保存父类Subscriber的实例

constructor(private _parentSubscriber: Subscriber<T>,
observerOrNext?: PartialObserver<T> | ((value: T) => void),
error?: (e?: any) => void,
complete?: () => void) {
super();

let next: ((value: T) => void);
let context: any = this;

// 如果observerOrNext是function,则将next赋值给this._next
if (isFunction(observerOrNext)) {
next = (<((value: T) => void)> observerOrNext);
} else if (observerOrNext) {

// 如果observerOrNext是对象,则分别将nexterrorcomplete三个属性方法赋值
next = (<PartialObserver<T>> observerOrNext).next;
error = (<PartialObserver<T>> observerOrNext).error;
complete = (<PartialObserver<T>> observerOrNext).complete;

if (observerOrNext !== emptyObserver) {
context = Object.create(observerOrNext); // 同时将context对象的__proto__指向该observerOrNext对象
if (isFunction(context.unsubscribe)) {
this.add(<() => void> context.unsubscribe.bind(context));
}
context.unsubscribe = this.unsubscribe.bind(this);
}
}

this._context = context;
this._next = next;
this._error = error;
this._complete = complete;
}

下面分析下SafeSubscriber中的next 和 __tryOrUnsub方法,error和complete方法主要功能就是throw error 和资源回收,具体代码就不分析了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// 从Subscriber中protected _next方法中调用了destination.next方法,一般调用到这边,

next(value?: T): void {
// isStopped是父类Subscriber中的变量, _next是在构造函数中赋值了
// 如果程序没有出错或者没有完成,并且_next有值,则将调用__tryOrUnsub方法。
if (!this.isStopped && this._next) {
const { _parentSubscriber } = this;
if (!config.useDeprecatedSynchronousErrorHandling || !_parentSubscriber.syncErrorThrowable) {
this.__tryOrUnsub(this._next, value);
} else if (this.__tryOrSetError(_parentSubscriber, this._next, value)) {
this.unsubscribe();
}
}
}

private __tryOrUnsub(fn: Function, value?: any): void {
try {
// fn则就是订阅者传进来的next方法,value就是Observable中产生的值,
// this._context,如果订阅者传进的参数是Object对象,则this.context的__proto__是该Object对象,
// 如果传进的参数是方法,则this._context则是该SafeSubscriber对象
fn.call(this._context, value);
} catch (err) {
// error 处理
this.unsubscribe();
if (config.useDeprecatedSynchronousErrorHandling) {
throw err;
} else {
hostReportError(err);
}
}
}

从以上分析可以看出,Subscriber负责接收参数和转换,并统一对外接口和决定当前程序是否结束,真正执行的订阅者的函数是在SafeSubscriber类中。


本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!