Rxjs源码分析(四)--Subject类

Subject继承至Observable, 是一种特殊类型的Observable,支持同时输出值到多个订阅者,自己维护一个订阅者序列,就像是观察者模式。

下面我们看看源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// 构造函数直接调用的Observable构造函数,本身并没有什么初始化内容
constructor() {
super();
}

// this.observers是订阅者队列,Subject对象调用next方法时,则将值传播给每个订阅者对象。
// 同理error,complete方法也是一样
next(value?: T) {
if (this.closed) {
throw new ObjectUnsubscribedError();
}
if (!this.isStopped) {
const { observers } = this;
const len = observers.length;
const copy = observers.slice();
for (let i = 0; i < len; i++) {
copy[i].next(value);
}
}
}

// Subject继承自Observable,当Subject对象调用Subscribe方法时,则通过原型链找到Observable上的方法Subscribe,而Subscribe方法
// 内部则调用了this._subscribe方法(具体可以看Observable类源码分析),
// 通过下面的方法,则将订阅者push到this.observers数组中。
_subscribe(subscriber: Subscriber<T>): Subscription {
if (this.closed) {
throw new ObjectUnsubscribedError();
} else if (this.hasError) {
subscriber.error(this.thrownError);
return Subscription.EMPTY;
} else if (this.isStopped) {
subscriber.complete();
return Subscription.EMPTY;
} else {
this.observers.push(subscriber);
return new SubjectSubscription(this, subscriber);
}
}

由于Subject自带next, error, complete等方法,则Subject对象可以作为参数传进Observable.subscribe方法中,
如:observable$.subscribe(new Subject());

Subject还有3中类型的传播值的方式,这三种都extends自Subject类

1.BehaviorSubject
储存最新的值,并将最新的值发给订阅者,一旦订阅者subscribe,则将会收到最新存储的值,下面我们看看源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 构造函数中赋值了个内部变量_value保存当前的值
constructor(private _value: T) {
super();
}


_subscribe(subscriber: Subscriber<T>): Subscription {
const subscription = super._subscribe(subscriber); // 将当前subscriber加入到observers队列中
if (subscription && !(<SubscriptionLike>subscription).closed) {
subscriber.next(this._value); // 调用subscriber的next方法,并传入当前值
}
return subscription;
}

next(value: T): void {
super.next(this._value = value); 调用Subject的next方法,将当前值多播
}

2.AsyncSubject
当complete结束时,将Subject最后的值传给订阅者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
_subscribe(subscriber: Subscriber<any>): Subscription {
if (this.hasError) {
subscriber.error(this.thrownError);
return Subscription.EMPTY;
} else if (this.hasCompleted && this.hasNext) {
subscriber.next(this.value); //如果当前是已经结束的状态,则传值给订阅者
subscriber.complete();
return Subscription.EMPTY;
}
return super._subscribe(subscriber);
}

next(value: T): void {
if (!this.hasCompleted) { // 当前如果没有结束,则将最新的值重新赋值
this.value = value;
this.hasNext = true;
}
}

// 调用该方法则执行结束,将hasCompleted变量至true,并将最新的值多播出去
complete(): void {
this.hasCompleted = true;
if (this.hasNext) {
super.next(this.value);
}
super.complete();
}

3.ReplaySubject
主要功能是当订阅者subscribe时候,可以获取最新的几个值,多长时间之前的值
接受3个参数,第一个参数是获取几个最新的值,第二个参数是时间,用于多久之前的值可以被存储,第三个参数表示调度模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
// 初始化了bufferSize, windowTime,如果不传并赋默认值Number.POSITIVE_INFINITY,第三个参数表示调度模式
constructor(bufferSize: number = Number.POSITIVE_INFINITY,
windowTime: number = Number.POSITIVE_INFINITY,
private scheduler?: SchedulerLike) {
super();
this._bufferSize = bufferSize < 1 ? 1 : bufferSize;
this._windowTime = windowTime < 1 ? 1 : windowTime;

if (windowTime === Number.POSITIVE_INFINITY) {
this._infiniteTimeWindow = true;
this.next = this.nextInfiniteTimeWindow; // 不同的时间分别对应不同的next方法
} else {
this.next = this.nextTimeWindow;
}
}

// 如果时间参数不传则next方法为下面这个,主要为将value传进_events数组中,如果超过bufferSize,则shift一个最前面的值
private nextInfiniteTimeWindow(value: T): void {
const _events = this._events;
_events.push(value);
if (_events.length > this._bufferSize) {
_events.shift();
}
super.next(value); // 调用Subject的next方法将值多播出去
}

// 如果时间参数有值则next方法为下面这个,这里面和上面的稍微有点不同,由于有了时间限制的概念,所以在存储每个值的时候也需要存储每个// 值的时间,所以通过ReplayEvent类来新建对象(包含time和value2个属性),_trimBufferThenGetEvents方法下面看下
private nextTimeWindow(value: T): void {
this._events.push(new ReplayEvent(this._getNow(), value));
this._trimBufferThenGetEvents();
super.next(value); // 调用Subject的next方法将值多播出去
}

// 该方法主要通过当前的时间和参数传进的时间做对比,还有bufferSize的大小,来决定this._events里的结果
private _trimBufferThenGetEvents(): ReplayEvent<T>[] {
const now = this._getNow();
const _bufferSize = this._bufferSize;
const _windowTime = this._windowTime;
const _events = <ReplayEvent<T>[]>this._events;

const eventsCount = _events.length;
let spliceCount = 0;

while (spliceCount < eventsCount) {
if ((now - _events[spliceCount].time) < _windowTime) {
break;
}
spliceCount++; // 找出几个不在规定时间内的events
}

if (eventsCount > _bufferSize) {
spliceCount = Math.max(spliceCount, eventsCount - _bufferSize);
// 通过比较不在规定时间内的数量和传进的bufferSize大小,决定最终删除几个events
}

if (spliceCount > 0) {
_events.splice(0, spliceCount); // 删除不需要的events
}

return _events;
}

// 看看ReplaySubject的subscribe方法
_subscribe(subscriber: Subscriber<T>): Subscription {
const _infiniteTimeWindow = this._infiniteTimeWindow;
const _events = _infiniteTimeWindow ? this._events : this._trimBufferThenGetEvents();
// 获取当前的_events, 不明白为何这边这样写,this._events === this._trimBufferThenGetEvents()的

const scheduler = this.scheduler;
const len = _events.length;
let subscription: Subscription;

// error或者销毁的情况处理
if (this.closed) {
throw new ObjectUnsubscribedError();
} else if (this.isStopped || this.hasError) {
subscription = Subscription.EMPTY;
} else {
// 添加subscriber到observers数组中
this.observers.push(subscriber);
subscription = new SubjectSubscription(this, subscriber); // 创建将来被销毁的资源
}

if (scheduler) { // scheduler是对方法的调度,用于何时执行
subscriber.add(subscriber = new ObserveOnSubscriber<T>(subscriber, scheduler));
}

// 将当前的events里的值传给subscriber的next函数,获取最新的几个值
if (_infiniteTimeWindow) {
for (let i = 0; i < len && !subscriber.closed; i++) {
subscriber.next(<T>_events[i]);
}
} else {
for (let i = 0; i < len && !subscriber.closed; i++) {
subscriber.next((<ReplayEvent<T>>_events[i]).value);
}
}

if (this.hasError) {
subscriber.error(this.thrownError);
} else if (this.isStopped) {
subscriber.complete();
}

return subscription;
}

Subject又提供了3种不同类型的subject,方便了在不同场合的应用,如Rxjs利用BehaviorSubject可以做成类似redux的状态管理,因为BehaviorSubject存储当前最新的值,并可以获取当前最新的值,且本身Rxjs就很适合异步应用,简单的实现可以看这儿