Observable Operators - switch, mergeAll, concatAll

今天我们要讲三个operators,这三个operators 都是用来处理Higher Order Observable。所谓的Higher Order Observable 就是指一个Observable 送出的元素还是一个Observable,就像是二维阵列一样,一个阵列中的每个元素都是阵列。如果用泛型来表达就像是

Observable<Observable<T>>

通常我们需要的是第二层Observable 送出的元素,所以我们希望可以把二维的Observable 改成一维的,像是下面这样

Observable<Observable<T>> => Observable<T>

其实想要做到这件事有三个方法switch、mergeAll 和concatAll,其中concatAll 我们在之前的文章已经稍微讲过了,今天这篇文章会讲解这三个operators 各自的效果跟差异。

Operators

concatAll

我们在讲简易拖拉的范例时就有讲过这个operator,concatAll 最重要的重点就是他会处理完前一个observable 才会在处理下一个observable,让我们来看一个范例

var click = Rx.Observable.fromEvent(document.body, 'click');
var source = click.map(e => Rx.Observable.interval(1000));

var example = source.concatAll();
example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});
// (点击后)
// 0
// 1
// 2
// 3
// 4
// 5 ...

JSBin

上面这段程式码,当我们点击画面时就会开始送出数值,如果用Marble Diagram 表示如下

click  : ---------c-c------------------c--.. 
        map(e => Rx.Observable.interval(1000))
source : ---------o-o------------------o--..
                   \ \
                    \ ----0----1----2----3----4--...
                     ----0----1----2----3----4--...
                     concatAll()
example: ----------------0----1----2----3----4--..

从Marble Diagram可以看得出来,当我们点击一下click事件会被转成一个observable而这个observable会每一秒送出一个递增的数值,当我们用concatAll之后会把二维的observable摊平成一维的observable ,但concatAll会一个一个处理,一定是等前一个observable完成(complete)才会处理下一个observable,因为现在送出observable是无限的永远不会完成(complete),就导致他永远不会处理第二个送出的observable!

我们再看一个例子

var click = Rx.Observable.fromEvent(document.body, 'click');
var source = click.map(e => Rx.Observable.interval(1000).take(3));

var example = source.concatAll();
example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

现在我们把送出的observable 限制只取前三个元素,用Marble Diagram 表示如下

click  : ---------c-c------------------c--.. 
        map(e => Rx.Observable.interval(1000))
source : ---------o-o------------------o--..
                   \ \                  \
                    \ ----0----1----2|   ----0----1----2|
                     ----0----1----2|
                     concatAll()
example: ----------------0----1----2----0----1----2--..

这里我们把送出的observable 变成有限的,只会送出三个元素,这时就能看得出来concatAll 不管两个observable 送出的时间多么相近,一定会先处理前一个observable 再处理下一个。

switch

switch 同样能把二维的observable 摊平成一维的,但他们在行为上有很大的不同,我们来看下面这个范例

var click = Rx.Observable.fromEvent(document.body, 'click');
var source = click.map(e => Rx.Observable.interval(1000));

var example = source.switch();
example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

JSBin

用Marble Diagram 表示如下

click  : ---------c-c------------------c--.. 
        map(e => Rx.Observable.interval(1000))
source : ---------o-o------------------o--..
                   \ \                  \----0----1--...
                    \ ----0----1----2----3----4--...
                     ----0----1----2----3----4--...
                     switch()
example: -----------------0----1----2--------0----1--...

switch最重要的就是他会在新的observable送出后直接处理新的observable不管前一个observable是否完成,每当有新的observable送出就会直接把旧的observable退订(unsubscribe),永远只处理最新的observable!

所以在这上面的Marble Diagram 可以看得出来第一次送出的observable 跟第二次送出的observable 时间点太相近,导致第一个observable 还来不及送出元素就直接被退订了,当下一次送出observable 就又会把前一次的observable 退订。

mergeAll

我们之前讲过merge 他可以让多个observable 同时送出元素,mergeAll 也是同样的道理,它会把二维的observable 转成一维的,并且能够同时处理所有的observable,让我们来看这个范例

var click = Rx.Observable.fromEvent(document.body, 'click');
var source = click.map(e => Rx.Observable.interval(1000));

var example = source.mergeAll();
example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

上面这段程式码用Marble Diagram 表示如下

click  : ---------c-c------------------c--.. 
        map(e => Rx.Observable.interval(1000))
source : ---------o-o------------------o--..
                   \ \                  \----0----1--...
                    \ ----0----1----2----3----4--...
                     ----0----1----2----3----4--...
                     switch()
example: ----------------00---11---22---33---(04)4--...

从Marble Diagram 可以看出来,所有的observable 是并行(Parallel)处理的,也就是说mergeAll 不会像switch 一样退订(unsubscribe)原先的observable 而是并行处理多个observable。以我们的范例来说,当我们点击越多下,最后送出的频率就会越快。

另外mergeAll 可以传入一个数值,这个数值代表他可以同时处理的observable 数量,我们来看一个例子

var click = Rx.Observable.fromEvent(document.body, 'click');
var source = click.map(e => Rx.Observable.interval(1000).take(3));

var example = source.mergeAll(2);
example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

这里我们送出的observable 改成取前三个,并且让mergeAll 最多只能同时处理2 个observable,用Marble Diagram 表示如下

click  : ---------c-c----------o----------.. 
        map(e => Rx.Observable.interval(1000))
source : ---------o-o----------c----------..
                   \ \          \----0----1----2|     
                    \ ----0----1----2|  
                     ----0----1----2|
                     mergeAll(2)
example: ----------------00---11---22---0----1----2--..

当mergeAll 传入参数后,就会等处理中的其中一个observable 完成,再去处理下一个。以我们的例子来说,前面两个observabel 可以被并行处理,但第三个observable 必须等到第一个observable 结束后,才会开始。

我们可以利用这个参数来决定要同时处理几个observable,如果我们传入1其行为就会跟concatAll是一模一样的,这点在原始码可以看到他们是完全相同的。

今日小结

今天介绍了三个可以处理High Order Observable 的方法,并讲解了三个方法的差异,不知道读者有没有收获呢?如果有任何问题欢迎在下方留言给我,感谢!

results matching ""

    No results matching ""