Observable Operator - combineLatest, withLatestFrom, zip

非同步最难的地方在于,当有多个非同步行为同时触发且相互依赖,这时候我们要处理的逻辑跟状态就会变得极其复杂,甚至程式码很可能会在完成的一两天后就成了Legacy Code。

昨天我们最后讲到了merge的用法,它的逻辑就像是OR(||)一样,可以把多个observable合并且同时处理,当其中任合一个observable送出元素时,我们都做相同的处理。

今天我们要讲的三个operators 则像是AND(&&) 逻辑,它们都是在多个元素送进来时,只输出一个新元素,但各自的行为上仍有差异,需要读者花点时间思考,建议在头脑清醒时阅读本篇文章。

Operators

combineLatest

首先我们要介绍的是combineLatest,它会取得各个observable 最后送出的值,再输出成一个值,我们直接看范例会比较好解释。

var source = Rx.Observable.interval(500).take(3);
var newest = Rx.Observable.interval(300).take(6);

var example = source.combineLatest(newest, (x, y) => x + y);

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});
// 0
// 1
// 2
// 3
// 4
// 5
// 6
// 7
// complete

JSBin | JSFiddle

大家第一次看到这个output 应该都会很困惑,我们直接来看Marble Diagram 吧!

source : ----0----1----2|
newest : --0--1--2--3--4--5|

    combineLatest(newest, (x, y) => x + y);

example: ----01--23-4--(56)--7|

首先combineLatest可以接收多个observable,最后一个参数是callback function,这个callback function接收的参数数量跟合并的observable数量相同,依照范例来说,因为我们这里合并了两个observable所以后面的callback function就接收x, y两个参数,x会接收从source发送出来的值,y会接收从newest发送出来的值。

最后一个重点就是一定会等两个observable都曾有送值出来才会呼叫我们传入的callback,所以这段程式是这样运行的

  • newest送出了0,但此时source并没有送出过任何值,所以不会执行callback
  • source送出了0,此时newest最后一次送出的值为0,把这两个数传入callback得到0
  • newest送出了1,此时source最后一次送出的值为0,把这两个数传入callback得到1
  • newest送出了2,此时source最后一次送出的值为0,把这两个数传入callback得到2
  • source送出了1,此时newest最后一次送出的值为2,把这两个数传入callback得到3
  • newest送出了3,此时source最后一次送出的值为1,把这两个数传入callback得到4
  • source送出了2,此时newest最后一次送出的值为3,把这两个数传入callback得到5
  • source 结束,但newest 还没结束,所以example 还不会结束。
  • newest送出了4,此时source最后一次送出的值为2,把这两个数传入callback得到6
  • newest送出了5,此时source最后一次送出的值为2,把这两个数传入callback得到7
  • newest 结束,因为source 也结束了,所以example 结束。

不管是source 还是newest 送出值来,只要另一方曾有送出过值(有最后的值),就会执行callback 并送出新的值,这就是combineLatest。

combineLatest 很常用在运算多个因子的结果,例如最常见的BMI 计算,我们身高变动时就拿上一次的体重计算新的BMI,当体重变动时则拿上一次的身高计算BMI,这就很适合用combineLatest 来处理!

zip

在讲withLatestFrom 之前,先让我们先来看一下zip 是怎么运作的,zip 会取每个observable 相同顺位的元素并传入callback,也就是说每个observable 的第n 个元素会一起被传入callback ,这里我们同样直接用范例讲解会比较清楚

var source = Rx.Observable.interval(500).take(3);
var newest = Rx.Observable.interval(300).take(6);

var example = source.zip(newest, (x, y) => x + y);

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});
// 0
// 2
// 4
// complete

JSBin | JSFiddle

Marble Diagram 长这样

source : ----0----1----2|
newest : --0--1--2--3--4--5|
    zip(newest, (x, y) => x + y)
example: ----0----2----4|

以我们的范例来说,zip会等到source跟newest都送出了第一个元素,再传入callback,下次则等到source跟newest都送出了第二个元素再一起传入callback,所以运行的步骤如下:

  • newest送出了第一个0,但此时source并没有送出第一个值,所以不会执行callback。
  • source送出了第一个0,newest之前送出的第一个值为0,把这两个数传入callback得到0
  • newest送出了第二个1,但此时source并没有送出第二个值,所以不会执行callback。
  • newest送出了第三个2,但此时source并没有送出第三个值,所以不会执行callback。
  • source送出了第二个1,newest之前送出的第二个值为1,把这两个数传入callback得到2
  • newest送出了第四个3,但此时source并没有送出第四个值,所以不会执行callback。
  • source送出了第三个2,newest之前送出的第三个值为2,把这两个数传入callback得到4
  • source 结束example 就直接结束,因为source 跟newest 不会再有对应顺位的值

zip 会把各个observable 相同顺位送出的值传入callback,这很常拿来做demo 使用,比如我们想要间隔100ms 送出'h', 'e', 'l', 'l', 'o',就可以这么做

var source = Rx.Observable.from('hello');
var source2 = Rx.Observable.interval(100);

var example = source.zip(source2, (x, y) => x);

这里的Marble Diagram 就很简单

source : (hello)|
source2: -0-1-2-3-4-...
        zip(source2, (x, y) => x)
example: -h-e-l-l-o|

这里我们利用zip 来达到原本只能同步送出的资料变成了非同步的,很适合用在建立示范用的资料。

建议大家平常没事不要乱用zip,除非真的需要。因为zip 必须cache 住还没处理的元素,当我们两个observable 一个很快一个很慢时,就会cache 非常多的元素,等待比较慢的那个observable。这很有可能造成记忆体相关的问题!

withLatestFrom

withLatestFrom 运作方式跟combineLatest 有点像,只是他有主从的关系,只有在主要的observable 送出新的值时,才会执行callback,附随的observable 只是在背景下运作。让我们看一个例子

var main = Rx.Observable.from('hello').zip(Rx.Observable.interval(500), (x, y) => x);
var some = Rx.Observable.from([0,1,0,0,0,1]).zip(Rx.Observable.interval(300), (x, y) => x);

var example = main.withLatestFrom(some, (x, y) => {
    return y === 1 ? x.toUpperCase() : x;
});

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

JSBin | JSFiddle

先看一下 Marble Diagram

main   : ----h----e----l----l----o|
some   : --0--1--0--0--0--1|

withLatestFrom(some, (x, y) =>  y === 1 ? x.toUpperCase() : x);

example: ----h----e----l----L----O|

withLatestFrom 会在main 送出值的时候执行callback,但请注意如果main 送出值时some 之前没有送出过任何值callback 仍然不会执行!

这里我们在main 送出值时,去判断some 最后一次送的值是不是1 来决定是否要切换大小写,执行步骤如下

  • main送出了h,此时some上一次送出的值为0,把这两个参数传入callback得到h
  • main送出了e,此时some上一次送出的值为0,把这两个参数传入callback得到e
  • main送出了l,此时some上一次送出的值为0,把这两个参数传入callback得到l
  • main送出了l,此时some上一次送出的值为1,把这两个参数传入callback得到L
  • main送出了o,此时some上一次送出的值为1,把这两个参数传入callback得到O

withLatestFrom 很常用在一些checkbox 型的功能,例如说一个编辑器,我们开启粗体后,打出来的字就都要变粗体,粗体就像是some observable,而我们打字就是main observable。

今日小结

今天介绍了三个合并用的operators,这三个operators 的callback 都会依照合并的observable 数量来传入参数,如果我们合并了三个observable,callback 就会有三个参数,而不管合并几个observable 都会只会回传一个值。

这几个operators 需要花比较多的时间思考,读者们不用硬记他的运作行为,只要稍微记得有这些operators 可以用就可以了。等到真的要用时,再重新回来看他们的运作方式做选择。

不知道读者们今天有没有收获呢?如果有任何问题,欢迎在下方留言给我,谢谢!

results matching ""

    No results matching ""