Subject 基本观念

终于进到了RxJS 的第二个重点Subject,不知道读者们有没有发现?我们在这篇文章之前的范例,每个observable 都只订阅了一次,而实际上observable 是可以多次订阅的

var source = Rx.Observable.interval(1000).take(3);

var observerA = {
    next: value => console.log('A next: ' + value),
    error: error => console.log('A error: ' + error),
    complete: () => console.log('A complete!')
}

var observerB = {
    next: value => console.log('B next: ' + value),
    error: error => console.log('B error: ' + error),
    complete: () => console.log('B complete!')
}

source.subscribe(observerA);
source.subscribe(observerB);

// "A next: 0"
// "B next: 0"
// "A next: 1"
// "B next: 1"
// "A next: 2"
// "A complete!"
// "B next: 2"
// "B complete!"

JSBin | JSFiddle

上面这段程式码,分别用observerA跟observerB订阅了source,从log可以看出来observerA跟observerB都各自收到了元素,但请记得这两个observer其实是分开执行的也就是说他们是完全独立的,我们把observerB延迟订阅来证明看看

var source = Rx.Observable.interval(1000).take(3);

var observerA = {
    next: value => console.log('A next: ' + value),
    error: error => console.log('A error: ' + error),
    complete: () => console.log('A complete!')
}

var observerB = {
    next: value => console.log('B next: ' + value),
    error: error => console.log('B error: ' + error),
    complete: () => console.log('B complete!')
}

source.subscribe(observerA);
setTimeout(() => {
    source.subscribe(observerB);
}, 1000);

// "A next: 0"
// "A next: 1"
// "B next: 0"
// "A next: 2"
// "A complete!"
// "B next: 1"
// "B next: 2"
// "B complete!"

JSBin | JSFiddle

这里我们延迟一秒再用observerB 订阅,可以从log 中看出1 秒后observerA 已经印到了1,这时observerB 开始印却是从0 开始,而不是接着observerA 的进度,代表这两次的订阅是完全分开来执行的,或者说是每次的订阅都建立了一个新的执行。

这样的行为在大部分的情境下适用,但有些案例下我们会希望第二次订阅source 不会从头开始接收元素,而是从第一次订阅到当前处理的元素开始发送,我们把这种处理方式称为组播(multicast),那我们要如何做到组播呢?

手动建立subject

或许已经有读者想到解法了,其实我们可以建立一个中间人来订阅source 再由中间人转送资料出去,就可以达到我们想要的效果

var source = Rx.Observable.interval(1000).take(3);

var observerA = {
    next: value => console.log('A next: ' + value),
    error: error => console.log('A error: ' + error),
    complete: () => console.log('A complete!')
}

var observerB = {
    next: value => console.log('B next: ' + value),
    error: error => console.log('B error: ' + error),
    complete: () => console.log('B complete!')
}

var subject = {
    observers: [],
    addObserver: function(observer) {
        this.observers.push(observer)
    },
    next: function(value) {
        this.observers.forEach(o => o.next(value))    
    },
    error: function(error){
        this.observers.forEach(o => o.error(error))
    },
    complete: function() {
        this.observers.forEach(o => o.complete())
    }
}

subject.addObserver(observerA)

source.subscribe(subject);

setTimeout(() => {
    subject.addObserver(observerB);
}, 1000);

// "A next: 0"
// "A next: 1"
// "B next: 1"
// "A next: 2"
// "B next: 2"
// "A complete!"
// "B complete!"

JSBin | JSFiddle

从上面的程式码可以看到,我们先建立了一个物件叫subject,这个物件具备observer 所有的方法(next, error, complete),并且还能addObserver 把observer 加到内部的清单中,每当有值送出就会遍历清单中的所有observer 并把值再次送出,这样一来不管多久之后加进来的observer,都会是从当前处理到的元素接续往下走,就像范例中所示,我们用subject 订阅source 并把observerA 加到subject 中,一秒后再把observerB 加到subject,这时就可以看到observerB 是直接收1 开始,这就是组播(multicast)的行为。

让我们把subject的addObserver改名成subscribe如下

var subject = {
    observers: [],
    subscribe: function(observer) {
        this.observers.push(observer)
    },
    next: function(value) {
        this.observers.forEach(o => o.next(value))    
    },
    error: function(error){
        this.observers.forEach(o => o.error(error))
    },
    complete: function() {
        this.observers.forEach(o => o.complete())
    }
}

应该有眼尖的读者已经发现,subject 其实就是用了Observer Pattern。但这边为了不要混淆Observer Pattern 跟RxJS 的observer 就不再内文提及。这也是为什么我们在一开始讲Observer Pattern 希望大家亲自实作的原因。

RxJS中的Subject确实是类似这样运作的,可以在原始码中看到

虽然上面是我们自己手写的subject,但运作方式跟RxJS 的Subject 实例是几乎一样的,我们把前面的程式码改成RxJS 提供的Subject 试试

var source = Rx.Observable.interval(1000).take(3);

var observerA = {
    next: value => console.log('A next: ' + value),
    error: error => console.log('A error: ' + error),
    complete: () => console.log('A complete!')
}

var observerB = {
    next: value => console.log('B next: ' + value),
    error: error => console.log('B error: ' + error),
    complete: () => console.log('B complete!')
}

var subject = new Rx.Subject()

subject.subscribe(observerA)

source.subscribe(subject);

setTimeout(() => {
    subject.subscribe(observerB);
}, 1000);

// "A next: 0"
// "A next: 1"
// "B next: 1"
// "A next: 2"
// "B next: 2"
// "A complete!"
// "B complete!"

JSBin | JSFiddle

大家会发现使用方式跟前面是相同的,建立一个subject 先拿去订阅observable(source),再把我们真正的observer 加到subject 中,这样一来就能完成订阅,而每个加到subject 中的observer 都能整组的接收到相同的元素。

什么是Subject?

虽然前面我们已经示范直接手写一个简单的subject,但到底RxJS 中的Subject 的概念到底是什么呢?

首先Subject 可以拿去订阅Observable(source) 代表他是一个Observer,同时Subject 又可以被Observer(observerA, observerB) 订阅,代表他是一个Observable。

总结成两句话

  • Subject 同时是Observable 又是Observer
  • Subject 会对内部的observers 清单进行组播(multicast)

补充: 没事不要看!其实Subject 就是Observer Pattern 的实作并且继承自Observable。

今日小结

今天介绍了RxJS 中的第二个重点Subject,重点放在Subject 主要的运作方式,以及概念上的所代表的意思,如果今天还不太能够吸收的读者不用紧张,后面我们会讲到subject 的一些应用,到时候就会有更深的体会。

不知道今天读者么有没有收获呢?如果有任何问题,欢迎在下方留言给我。

results matching ""

    No results matching ""