Subject 基本观念
终于进到了RxJS 的第二个重点Subject,不知道读者们有没有发现?我们在这篇文章之前的范例,每个observable 都只订阅了一次,而实际上observable 是可以多次订阅的
var source = Rx.Observable.interval(1000).take(3);
var observerA = {
next: value => console.log('A next: ' + value),
error: error => console.log('A error: ' + error),
complete: () => console.log('A complete!')
}
var observerB = {
next: value => console.log('B next: ' + value),
error: error => console.log('B error: ' + error),
complete: () => console.log('B complete!')
}
source.subscribe(observerA);
source.subscribe(observerB);
// "A next: 0"
// "B next: 0"
// "A next: 1"
// "B next: 1"
// "A next: 2"
// "A complete!"
// "B next: 2"
// "B complete!"
上面这段程式码,分别用observerA跟observerB订阅了source,从log可以看出来observerA跟observerB都各自收到了元素,但请记得这两个observer其实是分开执行的也就是说他们是完全独立的,我们把observerB延迟订阅来证明看看
var source = Rx.Observable.interval(1000).take(3);
var observerA = {
next: value => console.log('A next: ' + value),
error: error => console.log('A error: ' + error),
complete: () => console.log('A complete!')
}
var observerB = {
next: value => console.log('B next: ' + value),
error: error => console.log('B error: ' + error),
complete: () => console.log('B complete!')
}
source.subscribe(observerA);
setTimeout(() => {
source.subscribe(observerB);
}, 1000);
// "A next: 0"
// "A next: 1"
// "B next: 0"
// "A next: 2"
// "A complete!"
// "B next: 1"
// "B next: 2"
// "B complete!"
这里我们延迟一秒再用observerB 订阅,可以从log 中看出1 秒后observerA 已经印到了1,这时observerB 开始印却是从0 开始,而不是接着observerA 的进度,代表这两次的订阅是完全分开来执行的,或者说是每次的订阅都建立了一个新的执行。
这样的行为在大部分的情境下适用,但有些案例下我们会希望第二次订阅source 不会从头开始接收元素,而是从第一次订阅到当前处理的元素开始发送,我们把这种处理方式称为组播(multicast),那我们要如何做到组播呢?
手动建立subject
或许已经有读者想到解法了,其实我们可以建立一个中间人来订阅source 再由中间人转送资料出去,就可以达到我们想要的效果
var source = Rx.Observable.interval(1000).take(3);
var observerA = {
next: value => console.log('A next: ' + value),
error: error => console.log('A error: ' + error),
complete: () => console.log('A complete!')
}
var observerB = {
next: value => console.log('B next: ' + value),
error: error => console.log('B error: ' + error),
complete: () => console.log('B complete!')
}
var subject = {
observers: [],
addObserver: function(observer) {
this.observers.push(observer)
},
next: function(value) {
this.observers.forEach(o => o.next(value))
},
error: function(error){
this.observers.forEach(o => o.error(error))
},
complete: function() {
this.observers.forEach(o => o.complete())
}
}
subject.addObserver(observerA)
source.subscribe(subject);
setTimeout(() => {
subject.addObserver(observerB);
}, 1000);
// "A next: 0"
// "A next: 1"
// "B next: 1"
// "A next: 2"
// "B next: 2"
// "A complete!"
// "B complete!"
从上面的程式码可以看到,我们先建立了一个物件叫subject,这个物件具备observer 所有的方法(next, error, complete),并且还能addObserver 把observer 加到内部的清单中,每当有值送出就会遍历清单中的所有observer 并把值再次送出,这样一来不管多久之后加进来的observer,都会是从当前处理到的元素接续往下走,就像范例中所示,我们用subject 订阅source 并把observerA 加到subject 中,一秒后再把observerB 加到subject,这时就可以看到observerB 是直接收1 开始,这就是组播(multicast)的行为。
让我们把subject的addObserver
改名成subscribe
如下
var subject = {
observers: [],
subscribe: function(observer) {
this.observers.push(observer)
},
next: function(value) {
this.observers.forEach(o => o.next(value))
},
error: function(error){
this.observers.forEach(o => o.error(error))
},
complete: function() {
this.observers.forEach(o => o.complete())
}
}
应该有眼尖的读者已经发现,subject 其实就是用了Observer Pattern。但这边为了不要混淆Observer Pattern 跟RxJS 的observer 就不再内文提及。这也是为什么我们在一开始讲Observer Pattern 希望大家亲自实作的原因。
RxJS中的Subject确实是类似这样运作的,可以在原始码中看到
虽然上面是我们自己手写的subject,但运作方式跟RxJS 的Subject 实例是几乎一样的,我们把前面的程式码改成RxJS 提供的Subject 试试
var source = Rx.Observable.interval(1000).take(3);
var observerA = {
next: value => console.log('A next: ' + value),
error: error => console.log('A error: ' + error),
complete: () => console.log('A complete!')
}
var observerB = {
next: value => console.log('B next: ' + value),
error: error => console.log('B error: ' + error),
complete: () => console.log('B complete!')
}
var subject = new Rx.Subject()
subject.subscribe(observerA)
source.subscribe(subject);
setTimeout(() => {
subject.subscribe(observerB);
}, 1000);
// "A next: 0"
// "A next: 1"
// "B next: 1"
// "A next: 2"
// "B next: 2"
// "A complete!"
// "B complete!"
大家会发现使用方式跟前面是相同的,建立一个subject 先拿去订阅observable(source),再把我们真正的observer 加到subject 中,这样一来就能完成订阅,而每个加到subject 中的observer 都能整组的接收到相同的元素。
什么是Subject?
虽然前面我们已经示范直接手写一个简单的subject,但到底RxJS 中的Subject 的概念到底是什么呢?
首先Subject 可以拿去订阅Observable(source) 代表他是一个Observer,同时Subject 又可以被Observer(observerA, observerB) 订阅,代表他是一个Observable。
总结成两句话
- Subject 同时是Observable 又是Observer
- Subject 会对内部的observers 清单进行组播(multicast)
补充: 没事不要看!其实Subject 就是Observer Pattern 的实作并且继承自Observable。
今日小结
今天介绍了RxJS 中的第二个重点Subject,重点放在Subject 主要的运作方式,以及概念上的所代表的意思,如果今天还不太能够吸收的读者不用紧张,后面我们会讲到subject 的一些应用,到时候就会有更深的体会。
不知道今天读者么有没有收获呢?如果有任何问题,欢迎在下方留言给我。