Rxjs源码分析(一)--Observable类

Observable类中主要方法分析,一般从最简单的创建Observable,并订阅相应的Observable的例子开始分析整个Observable的大概流程,

如例子:

1
2
3
4
5
6
7
const o$ = new Observable(subscriber => {
subscriber.next(1);
subscriber.next(2);
});

const o$.subscribe(x => console.log(x));

下面开始看源码:

1
2
3
4
5
6
7
8
9
10
11
12
// Observable 构造函数

constructor(subscribe?: (this: Observable<T>, subscriber: Subscriber<T>) => TeardownLogic) {
if (subscribe) {
this._subscribe = subscribe;
}
// 只有一个参数subscribe,如果有参数就赋值给内部变量_subscribe
}
// 提供create静态函数create,创建新的observable
static create: Function = <T>(subscribe?: (subscriber: Subscriber<T>) => TeardownLogic) => {
return new Observable<T>(subscribe);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// Observable中的subscribe函数

subscribe(observerOrNext?: PartialObserver<T> | ((value: T) => void),
error?: (error: any) => void,
complete?: () => void): Subscription {

const { operator } = this; // operator 表示pipe中的操作符,后面会细看
const sink = toSubscriber(observerOrNext, error, complete); //将当前传进来的参数转变成Subscriber对象

if (operator) {
sink.add(operator.call(sink, this.source)); //Subscriber对象继承自Subscription,所以具有add方法
} else {
sink.add(
this.source || (config.useDeprecatedSynchronousErrorHandling && !sink.syncErrorThrowable) ?
this._subscribe(sink) :
this._trySubscribe(sink)
);
//add方法的作用是将Observable资源执行的结果添加到即将被销毁的队列中,后面我们可以再细看add
// 这边不管三目运算符是true还是false,最终都会执行this._subscribe这个函数,即在创建Observable
// 时传进来的函数参数。那么下面类自带的_subscribe函数什么时候执行呢,当在pipe中传operator对
// observable进行操作时。
}

// 分析Observable的执行过程 ,对于error warning handle 我们可以先暂时忽略
if (config.useDeprecatedSynchronousErrorHandling) {
if (sink.syncErrorThrowable) {
sink.syncErrorThrowable = false;
if (sink.syncErrorThrown) {
throw sink.syncErrorValue;
}
}
}

return sink;
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// Observable 提供的类内部调用的函数

_subscribe(subscriber: Subscriber<any>): TeardownLogic {
const { source } = this;
return source && source.subscribe(subscriber);
}

_trySubscribe(sink: Subscriber<T>): TeardownLogic {
try {
return this._subscribe(sink);
// 执行了Observable创建时传进来的函数参数,现在开始执行函数,同时将subscribe传进的
// next,error,complete转变成Subscriber对象作为参数传进来
} catch (err) {
if (config.useDeprecatedSynchronousErrorHandling) {
sink.syncErrorThrown = true;
sink.syncErrorValue = err;
}
if (canReportError(sink)) {
sink.error(err);
} else {
console.warn(err);
}
}
}

创建Observable的时候,函数并没有执行,只是将函数赋值给个变量,当订阅者开始订阅subscribe的时候才开始执行函数,所以Observable是惰性的,这个特性像函数一样,
同时订阅者也不关心什么时候返回值,我把参数callback传给你了,当你return值的时候,调用callback即可,和promise差不多,所以适合做异步运算。


本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!