Rxjs源码分析(三)--Subscription类

Subscription类代表了Observable执行过的资源,可以被回收的,下面我们看看它的构造函数和主要的方法,

1
2
3
4
5
6
// 将传进来的参数赋值
constructor(unsubscribe?: () => void) {
if (unsubscribe) {
(<any> this)._unsubscribe = unsubscribe;
}
}

在Observable中的subscribe方法中,有个add方法将Observable的执行结果作为参数传进来,其add方法就是Subscription中的,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
add(teardown: TeardownLogic): Subscription {
let subscription = (<Subscription>teardown);
// 如果teardown参数为空,则返回一个新建的Subscription
if (!(<any>teardown)) {
return Subscription.EMPTY;
}

switch (typeof teardown) {
case 'function': // 是函数时,则将函数作为参数并创建新的Subscription
subscription = new Subscription(<(() => void)>teardown);
case 'object': // 是Object类型时,主要是为了处理一些operator的特殊情况
if (subscription === this || subscription.closed || typeof subscription.unsubscribe !== 'function') {
return subscription;
} else if (this.closed) {
subscription.unsubscribe();
return subscription;
} else if (!(subscription instanceof Subscription)) {
const tmp = subscription;
subscription = new Subscription();
subscription._subscriptions = [tmp];
}
break;
default: {
throw new Error('unrecognized teardown ' + teardown + ' added to Subscription.');
}
}

// subscription创建一个_parentOrParents指向父级(Subscriber实例)
let { _parentOrParents } = subscription;
if (_parentOrParents === null) {
subscription._parentOrParents = this;
} else if (_parentOrParents instanceof Subscription) {
if (_parentOrParents === this) {
return subscription;
}
subscription._parentOrParents = [_parentOrParents, this];
} else if (_parentOrParents.indexOf(this) === -1) {
_parentOrParents.push(this);
} else {
return subscription;
}

// add方法可以添加child subscription
const subscriptions = this._subscriptions;
if (subscriptions === null) {
this._subscriptions = [subscription];
} else {
subscriptions.push(subscription);
}

return subscription;
}

下面我们看看Subscription的主要功能unsubscribe方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
unsubscribe(): void {
let errors: any[];

// closed 表示是否已经已经执行过unsubscribe
if (this.closed) {
return;
}
// _subscriptions变量保存当前需要被销毁的资源,包含child subscription
let { _parentOrParents, _unsubscribe, _subscriptions } = (<any> this);

this.closed = true;
// 重置以防沿着原型链修改到上级的变量
this._parentOrParents = null;
this._subscriptions = null;

// 从_parentOrParents对象中移除本次subscription中,
// remove函数(源码在下面)中移除_subscription数组中的某个元素,而_subscription就是父级上的_subscription,
// 之前this._subscriptions已经置null,这边再调用remove有意义?

if (_parentOrParents instanceof Subscription) {
_parentOrParents.remove(this);
} else if (_parentOrParents !== null) {
for (let index = 0; index < _parentOrParents.length; ++index) {
const parent = _parentOrParents[index];
parent.remove(this);
}
}

if (isFunction(_unsubscribe)) {
// 如果当前是通过new Subscription出来的待被销毁的函数,则call,表明当前Observable有返回待被unsubscribe函数
try {
_unsubscribe.call(this);
} catch (e) {
errors = e instanceof UnsubscriptionError ? flattenUnsubscriptionErrors(e.errors) : [e];
}
}

// 这边通过循环来执行subscription,包括当前的和添加的child级别的,
if (isArray(_subscriptions)) {
let index = -1;
let len = _subscriptions.length;

while (++index < len) {
const sub = _subscriptions[index];
if (isObject(sub)) {
try {
sub.unsubscribe(); // 回到当前函数来执行其中的child
} catch (e) {
errors = errors || [];
if (e instanceof UnsubscriptionError) {
errors = errors.concat(flattenUnsubscriptionErrors(e.errors));
} else {
errors.push(e);
}
}
}
}
}

if (errors) {
throw new UnsubscriptionError(errors);
}
}

remove(subscription: Subscription): void {
const subscriptions = this._subscriptions;
if (subscriptions) {
const subscriptionIndex = subscriptions.indexOf(subscription);
if (subscriptionIndex !== -1) {
subscriptions.splice(subscriptionIndex, 1);
}
}
}

Subscription通过add方法将可回收的资源添加,不管是空还是observable返回的函数,都可以通过Subscription回收,最后通过unsubscribe函数循环将可回收执行回收。


本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!