Observable Operator - combineLatest, withLatestFrom, zip
非同步最难的地方在于,当有多个非同步行为同时触发且相互依赖,这时候我们要处理的逻辑跟状态就会变得极其复杂,甚至程式码很可能会在完成的一两天后就成了Legacy Code。
昨天我们最后讲到了merge
的用法,它的逻辑就像是OR(||)一样,可以把多个observable合并且同时处理,当其中任合一个observable送出元素时,我们都做相同的处理。
今天我们要讲的三个operators 则像是AND(&&) 逻辑,它们都是在多个元素送进来时,只输出一个新元素,但各自的行为上仍有差异,需要读者花点时间思考,建议在头脑清醒时阅读本篇文章。
Operators
combineLatest
首先我们要介绍的是combineLatest,它会取得各个observable 最后送出的值,再输出成一个值,我们直接看范例会比较好解释。
var source = Rx.Observable.interval(500).take(3);
var newest = Rx.Observable.interval(300).take(6);
var example = source.combineLatest(newest, (x, y) => x + y);
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
// 0
// 1
// 2
// 3
// 4
// 5
// 6
// 7
// complete
大家第一次看到这个output 应该都会很困惑,我们直接来看Marble Diagram 吧!
source : ----0----1----2|
newest : --0--1--2--3--4--5|
combineLatest(newest, (x, y) => x + y);
example: ----01--23-4--(56)--7|
首先combineLatest
可以接收多个observable,最后一个参数是callback function,这个callback function接收的参数数量跟合并的observable数量相同,依照范例来说,因为我们这里合并了两个observable所以后面的callback function就接收x, y两个参数,x会接收从source发送出来的值,y会接收从newest发送出来的值。
最后一个重点就是一定会等两个observable都曾有送值出来才会呼叫我们传入的callback,所以这段程式是这样运行的
- newest送出了
0
,但此时source并没有送出过任何值,所以不会执行callback - source送出了
0
,此时newest最后一次送出的值为0
,把这两个数传入callback得到0
。 - newest送出了
1
,此时source最后一次送出的值为0
,把这两个数传入callback得到1
。 - newest送出了
2
,此时source最后一次送出的值为0
,把这两个数传入callback得到2
。 - source送出了
1
,此时newest最后一次送出的值为2
,把这两个数传入callback得到3
。 - newest送出了
3
,此时source最后一次送出的值为1
,把这两个数传入callback得到4
。 - source送出了
2
,此时newest最后一次送出的值为3
,把这两个数传入callback得到5
。 - source 结束,但newest 还没结束,所以example 还不会结束。
- newest送出了
4
,此时source最后一次送出的值为2
,把这两个数传入callback得到6
。 - newest送出了
5
,此时source最后一次送出的值为2
,把这两个数传入callback得到7
。 - newest 结束,因为source 也结束了,所以example 结束。
不管是source 还是newest 送出值来,只要另一方曾有送出过值(有最后的值),就会执行callback 并送出新的值,这就是combineLatest。
combineLatest 很常用在运算多个因子的结果,例如最常见的BMI 计算,我们身高变动时就拿上一次的体重计算新的BMI,当体重变动时则拿上一次的身高计算BMI,这就很适合用combineLatest 来处理!
zip
在讲withLatestFrom 之前,先让我们先来看一下zip 是怎么运作的,zip 会取每个observable 相同顺位的元素并传入callback,也就是说每个observable 的第n 个元素会一起被传入callback ,这里我们同样直接用范例讲解会比较清楚
var source = Rx.Observable.interval(500).take(3);
var newest = Rx.Observable.interval(300).take(6);
var example = source.zip(newest, (x, y) => x + y);
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
// 0
// 2
// 4
// complete
Marble Diagram 长这样
source : ----0----1----2|
newest : --0--1--2--3--4--5|
zip(newest, (x, y) => x + y)
example: ----0----2----4|
以我们的范例来说,zip会等到source跟newest都送出了第一个元素,再传入callback,下次则等到source跟newest都送出了第二个元素再一起传入callback,所以运行的步骤如下:
- newest送出了第一个值
0
,但此时source并没有送出第一个值,所以不会执行callback。 - source送出了第一个值
0
,newest之前送出的第一个值为0
,把这两个数传入callback得到0
。 - newest送出了第二个值
1
,但此时source并没有送出第二个值,所以不会执行callback。 - newest送出了第三个值
2
,但此时source并没有送出第三个值,所以不会执行callback。 - source送出了第二个值
1
,newest之前送出的第二个值为1
,把这两个数传入callback得到2
。 - newest送出了第四个值
3
,但此时source并没有送出第四个值,所以不会执行callback。 - source送出了第三个值
2
,newest之前送出的第三个值为2
,把这两个数传入callback得到4
。 - source 结束example 就直接结束,因为source 跟newest 不会再有对应顺位的值
zip 会把各个observable 相同顺位送出的值传入callback,这很常拿来做demo 使用,比如我们想要间隔100ms 送出'h', 'e', 'l', 'l', 'o',就可以这么做
var source = Rx.Observable.from('hello');
var source2 = Rx.Observable.interval(100);
var example = source.zip(source2, (x, y) => x);
这里的Marble Diagram 就很简单
source : (hello)|
source2: -0-1-2-3-4-...
zip(source2, (x, y) => x)
example: -h-e-l-l-o|
这里我们利用zip 来达到原本只能同步送出的资料变成了非同步的,很适合用在建立示范用的资料。
建议大家平常没事不要乱用zip,除非真的需要。因为zip 必须cache 住还没处理的元素,当我们两个observable 一个很快一个很慢时,就会cache 非常多的元素,等待比较慢的那个observable。这很有可能造成记忆体相关的问题!
withLatestFrom
withLatestFrom 运作方式跟combineLatest 有点像,只是他有主从的关系,只有在主要的observable 送出新的值时,才会执行callback,附随的observable 只是在背景下运作。让我们看一个例子
var main = Rx.Observable.from('hello').zip(Rx.Observable.interval(500), (x, y) => x);
var some = Rx.Observable.from([0,1,0,0,0,1]).zip(Rx.Observable.interval(300), (x, y) => x);
var example = main.withLatestFrom(some, (x, y) => {
return y === 1 ? x.toUpperCase() : x;
});
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
先看一下 Marble Diagram
main : ----h----e----l----l----o|
some : --0--1--0--0--0--1|
withLatestFrom(some, (x, y) => y === 1 ? x.toUpperCase() : x);
example: ----h----e----l----L----O|
withLatestFrom 会在main 送出值的时候执行callback,但请注意如果main 送出值时some 之前没有送出过任何值callback 仍然不会执行!
这里我们在main 送出值时,去判断some 最后一次送的值是不是1 来决定是否要切换大小写,执行步骤如下
- main送出了
h
,此时some上一次送出的值为0
,把这两个参数传入callback得到h
。 - main送出了
e
,此时some上一次送出的值为0
,把这两个参数传入callback得到e
。 - main送出了
l
,此时some上一次送出的值为0
,把这两个参数传入callback得到l
。 - main送出了
l
,此时some上一次送出的值为1
,把这两个参数传入callback得到L
。 - main送出了
o
,此时some上一次送出的值为1
,把这两个参数传入callback得到O
。
withLatestFrom 很常用在一些checkbox 型的功能,例如说一个编辑器,我们开启粗体后,打出来的字就都要变粗体,粗体就像是some observable,而我们打字就是main observable。
今日小结
今天介绍了三个合并用的operators,这三个operators 的callback 都会依照合并的observable 数量来传入参数,如果我们合并了三个observable,callback 就会有三个参数,而不管合并几个observable 都会只会回传一个值。
这几个operators 需要花比较多的时间思考,读者们不用硬记他的运作行为,只要稍微记得有这些operators 可以用就可以了。等到真的要用时,再重新回来看他们的运作方式做选择。
不知道读者们今天有没有收获呢?如果有任何问题,欢迎在下方留言给我,谢谢!