前面我们分析了Observable类,大致了解了Observable类的执行时间以及如何执行的。
从Observable的Subscribe函数中,调用了函数toSubscriber返回Subscriber对象,我们看看该函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| // 如何参数提供的是Subscriber对象,则返回该对象, // 否则将next, error, complete参数传进Subscriber类中去创建新的对象。
export function toSubscriber<T>( nextOrObserver?: PartialObserver<T> | ((value: T) => void), error?: (error: any) => void, complete?: () => void): Subscriber<T> {
if (nextOrObserver) { if (nextOrObserver instanceof Subscriber) { return (<Subscriber<T>> nextOrObserver); }
if (nextOrObserver[rxSubscriberSymbol]) { return nextOrObserver[rxSubscriberSymbol](); } }
if (!nextOrObserver && !error && !complete) { return new Subscriber(emptyObserver); }
return new Subscriber(nextOrObserver, error, complete); }
|
看看Subscriber的构造函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| constructor(destinationOrNext?: PartialObserver<any> | ((value: T) => void), error?: (e?: any) => void, complete?: () => void) { super(); // 调用父类构造函数,继承至Subscription,从前面的类图可以看出
switch (arguments.length) { case 0: // 不提供参数,则将emptyObserver赋值, empty是一个对象包含next,error,complete三个空函数, // 和一个closed为true的bool变量 this.destination = emptyObserver; break;
//下面代码的主要功能是: 给destination变量赋值, 如果参数提供的是Object 则判断是否是Subscriber实例,是则赋值,否则将// new SafeSubscriber实例并赋值,所以destination变量也是Subscriber实例。SafeSubscriber类等下我们分析其作用 case 1: if (!destinationOrNext) { this.destination = emptyObserver; break; } if (typeof destinationOrNext === 'object') { if (destinationOrNext instanceof Subscriber) { this.syncErrorThrowable = destinationOrNext.syncErrorThrowable; this.destination = destinationOrNext; destinationOrNext.add(this); } else { this.syncErrorThrowable = true; this.destination = new SafeSubscriber<T>(this, <PartialObserver<any>> destinationOrNext); } break; } default: this.syncErrorThrowable = true; this.destination = new SafeSubscriber<T>(this, <((value: T) => void)> destinationOrNext, error, complete); break; } }
|
下面看看Subscriber中的next,error,complete方法,同时内部调用了_next, _error, _complete方法。
为何需要这么麻烦,还要再次定义_next,_error,_complete这三个内部使用的方法,应该是为了代码复用和功能内聚吧。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| next(value?: T): void { if (!this.isStopped) { this._next(value); } } error(err?: any): void { if (!this.isStopped) { this.isStopped = true; this._error(err); } } complete(): void { if (!this.isStopped) { this.isStopped = true; this._complete(); } }
protected _next(value: T): void { this.destination.next(value); }
protected _error(err: any): void { this.destination.error(err); this.unsubscribe(); }
protected _complete(): void { this.destination.complete(); this.unsubscribe(); }
|
下面我们看看SafeSubscriber类,该类继承至Subscriber,先来看看构造函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| // SafeSubscriber中同时保存赋值_parentSubscriber变量,该变量保存父类Subscriber的实例
constructor(private _parentSubscriber: Subscriber<T>, observerOrNext?: PartialObserver<T> | ((value: T) => void), error?: (e?: any) => void, complete?: () => void) { super();
let next: ((value: T) => void); let context: any = this;
// 如果observerOrNext是function,则将next赋值给this._next if (isFunction(observerOrNext)) { next = (<((value: T) => void)> observerOrNext); } else if (observerOrNext) {
// 如果observerOrNext是对象,则分别将next,error,complete三个属性方法赋值 next = (<PartialObserver<T>> observerOrNext).next; error = (<PartialObserver<T>> observerOrNext).error; complete = (<PartialObserver<T>> observerOrNext).complete;
if (observerOrNext !== emptyObserver) { context = Object.create(observerOrNext); // 同时将context对象的__proto__指向该observerOrNext对象 if (isFunction(context.unsubscribe)) { this.add(<() => void> context.unsubscribe.bind(context)); } context.unsubscribe = this.unsubscribe.bind(this); } }
this._context = context; this._next = next; this._error = error; this._complete = complete; }
|
下面分析下SafeSubscriber中的next 和 __tryOrUnsub方法,error和complete方法主要功能就是throw error 和资源回收,具体代码就不分析了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
|
next(value?: T): void { if (!this.isStopped && this._next) { const { _parentSubscriber } = this; if (!config.useDeprecatedSynchronousErrorHandling || !_parentSubscriber.syncErrorThrowable) { this.__tryOrUnsub(this._next, value); } else if (this.__tryOrSetError(_parentSubscriber, this._next, value)) { this.unsubscribe(); } } }
private __tryOrUnsub(fn: Function, value?: any): void { try { fn.call(this._context, value); } catch (err) { this.unsubscribe(); if (config.useDeprecatedSynchronousErrorHandling) { throw err; } else { hostReportError(err); } } }
|
从以上分析可以看出,Subscriber负责接收参数和转换,并统一对外接口和决定当前程序是否结束,真正执行的订阅者的函数是在SafeSubscriber类中。