Subject继承至Observable, 是一种特殊类型的Observable,支持同时输出值到多个订阅者,自己维护一个订阅者序列,就像是观察者模式。
下面我们看看源码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 constructor () { super (); } next(value?: T) { if (this .closed) { throw new ObjectUnsubscribedError(); } if (!this .isStopped) { const { observers } = this ; const len = observers.length; const copy = observers.slice(); for (let i = 0 ; i < len; i++) { copy[i].next(value); } } } _subscribe(subscriber: Subscriber<T>): Subscription { if (this .closed) { throw new ObjectUnsubscribedError(); } else if (this .hasError) { subscriber.error(this .thrownError); return Subscription.EMPTY; } else if (this .isStopped) { subscriber.complete(); return Subscription.EMPTY; } else { this .observers.push(subscriber); return new SubjectSubscription(this , subscriber); } }
由于Subject自带next, error, complete等方法,则Subject对象可以作为参数传进Observable.subscribe方法中, 如:observable$.subscribe(new Subject());
Subject还有3中类型的传播值的方式,这三种都extends自Subject类
1.BehaviorSubject 储存最新的值,并将最新的值发给订阅者,一旦订阅者subscribe,则将会收到最新存储的值,下面我们看看源码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 constructor(private _value: T) { super (); } _subscribe(subscriber: Subscriber<T>): Subscription { const subscription = super ._subscribe(subscriber); if (subscription && !(<SubscriptionLike>subscription).closed) { subscriber.next (this ._value); } return subscription; }next (value: T): void { super .next (this ._value = value); 调用Subject的next 方法,将当前值多播 }
2.AsyncSubject 当complete结束时,将Subject最后的值传给订阅者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 _subscribe(subscriber: Subscriber<any>): Subscription { if (this .hasError) { subscriber.error(this .thrownError); return Subscription.EMPTY; } else if (this .hasCompleted && this .hasNext) { subscriber.next(this .value); subscriber.complete(); return Subscription.EMPTY; } return super ._subscribe(subscriber); } next(value: T): void { if (!this .hasCompleted) { this .value = value; this .hasNext = true ; } } complete(): void { this .hasCompleted = true ; if (this .hasNext) { super .next(this .value); } super .complete(); }
3.ReplaySubject 主要功能是当订阅者subscribe时候,可以获取最新的几个值,多长时间之前的值 接受3个参数,第一个参数是获取几个最新的值,第二个参数是时间,用于多久之前的值可以被存储,第三个参数表示调度模式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 constructor (bufferSize: number = Number.POSITIVE_INFINITY, windowTime: number = Number.POSITIVE_INFINITY, private scheduler?: SchedulerLike) { super (); this ._bufferSize = bufferSize < 1 ? 1 : bufferSize; this ._windowTime = windowTime < 1 ? 1 : windowTime; if (windowTime === Number.POSITIVE_INFINITY) { this ._infiniteTimeWindow = true ; this .next = this .nextInfiniteTimeWindow; } else { this .next = this .nextTimeWindow; } }private nextInfiniteTimeWindow(value: T): void { const _events = this ._events; _events.push(value); if (_events.length > this ._bufferSize) { _events.shift(); } super .next(value); }private nextTimeWindow(value: T): void { this ._events.push(new ReplayEvent(this ._getNow(), value)); this ._trimBufferThenGetEvents(); super .next(value); }private _trimBufferThenGetEvents(): ReplayEvent<T>[] { const now = this ._getNow(); const _bufferSize = this ._bufferSize; const _windowTime = this ._windowTime; const _events = <ReplayEvent<T>[]>this ._events; const eventsCount = _events.length; let spliceCount = 0 ; while (spliceCount < eventsCount) { if ((now - _events[spliceCount].time) < _windowTime) { break ; } spliceCount++; } if (eventsCount > _bufferSize) { spliceCount = Math.max(spliceCount, eventsCount - _bufferSize); } if (spliceCount > 0 ) { _events.splice(0 , spliceCount); } return _events; } _subscribe(subscriber: Subscriber<T>): Subscription { const _infiniteTimeWindow = this ._infiniteTimeWindow; const _events = _infiniteTimeWindow ? this ._events : this ._trimBufferThenGetEvents(); const scheduler = this .scheduler; const len = _events.length; let subscription: Subscription; if (this .closed) { throw new ObjectUnsubscribedError(); } else if (this .isStopped || this .hasError) { subscription = Subscription.EMPTY; } else { this .observers.push(subscriber); subscription = new SubjectSubscription(this , subscriber); } if (scheduler) { subscriber.add(subscriber = new ObserveOnSubscriber<T>(subscriber, scheduler)); } if (_infiniteTimeWindow) { for (let i = 0 ; i < len && !subscriber.closed; i++) { subscriber.next(<T>_events[i]); } } else { for (let i = 0 ; i < len && !subscriber.closed; i++) { subscriber.next((<ReplayEvent<T>>_events[i]).value); } } if (this .hasError) { subscriber.error(this .thrownError); } else if (this .isStopped) { subscriber.complete(); } return subscription; }
Subject又提供了3种不同类型的subject,方便了在不同场合的应用,如Rxjs利用BehaviorSubject可以做成类似redux的状态管理,因为BehaviorSubject存储当前最新的值,并可以获取当前最新的值,且本身Rxjs就很适合异步应用,简单的实现可以看这儿