Observable operators - multicast, refCount, publish, share
昨天我们介绍完了各种Subject,不晓得各位读者还记不记得在一开始讲到Subject 时,是希望能够让Observable 有新订阅时,可以共用前一个订阅的执行而不要从头开始,如下面的例子
var source = Rx.Observable.interval(1000).take(3);
var observerA = {
next: value => console.log('A next: ' + value),
error: error => console.log('A error: ' + error),
complete: () => console.log('A complete!')
}
var observerB = {
next: value => console.log('B next: ' + value),
error: error => console.log('B error: ' + error),
complete: () => console.log('B complete!')
}
var subject = new Rx.Subject()
subject.subscribe(observerA)
source.subscribe(subject);
setTimeout(() => {
subject.subscribe(observerB);
}, 1000);
// "A next: 0"
// "A next: 1"
// "B next: 1"
// "A next: 2"
// "B next: 2"
// "A complete!"
// "B complete!"
上面这段程式码我们用subject 订阅了source,再把observerA 跟observerB 一个个订阅到subject,这样就可以让observerA 跟observerB 共用同一个执行。但这样的写法会让程式码看起来太过复杂,我们可以用Observable 的multicast operator 来简化这段程式
Operators
multicast
multicast 可以用来挂载subject 并回传一个可连结(connectable)的observable,如下
var source = Rx.Observable.interval(1000)
.take(3)
.multicast(new Rx.Subject());
var observerA = {
next: value => console.log('A next: ' + value),
error: error => console.log('A error: ' + error),
complete: () => console.log('A complete!')
}
var observerB = {
next: value => console.log('B next: ' + value),
error: error => console.log('B error: ' + error),
complete: () => console.log('B complete!')
}
source.subscribe(observerA); // subject.subscribe(observerA)
source.connect(); // source.subscribe(subject)
setTimeout(() => {
source.subscribe(observerB); // subject.subscribe(observerB)
}, 1000);
上面这段程式码我们透过multicast 来挂载一个subject 之后这个observable(source) 的订阅其实都是订阅到subject 上。
source.subscribe(observerA); // subject.subscribe(observerA)
必须真的等到执行connect()
后才会真的用subject订阅source,并开始送出元素,如果没有执行connect()
observable是不会真正执行的。
source.connect();
另外值得注意的是这里要退订的话,要把connect()
回传的subscription退订才会真正停止observable的执行,如下
var source = Rx.Observable.interval(1000)
.do(x => console.log('send: ' + x))
.multicast(new Rx.Subject()); // 无限的 observable
var observerA = {
next: value => console.log('A next: ' + value),
error: error => console.log('A error: ' + error),
complete: () => console.log('A complete!')
}
var observerB = {
next: value => console.log('B next: ' + value),
error: error => console.log('B error: ' + error),
complete: () => console.log('B complete!')
}
var subscriptionA = source.subscribe(observerA);
var realSubscription = source.connect();
var subscriptionB;
setTimeout(() => {
subscriptionB = source.subscribe(observerB);
}, 1000);
setTimeout(() => {
subscriptionA.unsubscribe();
subscriptionB.unsubscribe();
// 这里虽然 A 跟 B 都退订了,但 source 还会继续送元素
}, 5000);
setTimeout(() => {
realSubscription.unsubscribe();
// 这里 source 才会真正停止送元素
}, 7000);
上面这段的程式码,必须等到realSubscription.unsubscribe()
执行完,source才会真的结束。
虽然用了multicast 感觉会让我们处理的对象少一点,但必须搭配connect 一起使用还是让程式码有点复杂,通常我们会希望有observer 订阅时,就立即执行并发送元素,而不要再多执行一个方法(connect),这时我们就可以用refCount。
refCount
refCount 必须搭配multicast 一起使用,他可以建立一个只要有订阅就会自动connect 的observable,范例如下
var source = Rx.Observable.interval(1000)
.do(x => console.log('send: ' + x))
.multicast(new Rx.Subject())
.refCount();
var observerA = {
next: value => console.log('A next: ' + value),
error: error => console.log('A error: ' + error),
complete: () => console.log('A complete!')
}
var observerB = {
next: value => console.log('B next: ' + value),
error: error => console.log('B error: ' + error),
complete: () => console.log('B complete!')
}
var subscriptionA = source.subscribe(observerA);
// 订阅数 0 => 1
var subscriptionB;
setTimeout(() => {
subscriptionB = source.subscribe(observerB);
// 订阅数 0 => 2
}, 1000);
上面这段程式码,当source 一被observerA 订阅时(订阅数从0 变成1),就会立即执行并发送元素,我们就不需要再额外执行connect。
同样的在退订时只要订阅数变成0 就会自动停止发送
var source = Rx.Observable.interval(1000)
.do(x => console.log('send: ' + x))
.multicast(new Rx.Subject())
.refCount();
var observerA = {
next: value => console.log('A next: ' + value),
error: error => console.log('A error: ' + error),
complete: () => console.log('A complete!')
}
var observerB = {
next: value => console.log('B next: ' + value),
error: error => console.log('B error: ' + error),
complete: () => console.log('B complete!')
}
var subscriptionA = source.subscribe(observerA);
// 订阅数 0 => 1
var subscriptionB;
setTimeout(() => {
subscriptionB = source.subscribe(observerB);
// 订阅数 0 => 2
}, 1000);
setTimeout(() => {
subscriptionA.unsubscribe(); // 订阅数 2 => 1
subscriptionB.unsubscribe(); // 订阅数 1 => 0,source 停止发送元素
}, 5000);
publish
其实multicast(new Rx.Subject())
很常用到,我们有一个简化的写法那就是publish,下面这两段程式码是完全等价的
var source = Rx.Observable.interval(1000)
.publish()
.refCount();
// var source = Rx.Observable.interval(1000)
// .multicast(new Rx.Subject())
// .refCount();
加上Subject 的三种变形
var source = Rx.Observable.interval(1000)
.publishReplay(1)
.refCount();
// var source = Rx.Observable.interval(1000)
// .multicast(new Rx.ReplaySubject(1))
// .refCount();
var source = Rx.Observable.interval(1000)
.publishBehavior(0)
.refCount();
// var source = Rx.Observable.interval(1000)
// .multicast(new Rx.BehaviorSubject(0))
// .refCount();
var source = Rx.Observable.interval(1000)
.publishLast()
.refCount();
// var source = Rx.Observable.interval(1000)
// .multicast(new Rx.AsyncSubject(1))
// .refCount();
share
另外publish + refCount 可以在简写成share
var source = Rx.Observable.interval(1000)
.share();
// var source = Rx.Observable.interval(1000)
// .publish()
// .refCount();
// var source = Rx.Observable.interval(1000)
// .multicast(new Rx.Subject())
// .refCount();
今日小结
今天主要讲解了multicast 和refCount 两个operators 可以帮助我们既可能的简化程式码,并同时达到组播的效果。最后介绍publish 跟share 几个简化写法,这几个简化的写法是比较常见的,在理解multicast 跟refCount 运作方式后就能直接套用到publish 跟share 上。
不知道今天读者们有没有收获呢?如果有任何问题欢迎在下方留言给我,谢谢^^