Observable Operators - catch, retry, retryWhen, repeat

我们已经快把所有基本的转换(Transformation)、过滤(Filter)和合并(Combination)的operators 讲完了。今天要讲错误处理(Error Handling)的operators,错误处理是非同步行为中的一大难题,尤其有多个交错的非同步行为时,更容易凸显错误处理的困难。

就让我们一起来看看在RxJS 中能如何处理错误吧!

Operators

catch

catch 是很常见的非同步错误处理方法,在RxJS 中也能够直接用catch 来处理错误,在RxJS 中的catch 可以回传一个observable 来送出新的值,让我们直接来看范例:

var source = Rx.Observable.from(['a','b','c','d',2])
            .zip(Rx.Observable.interval(500), (x,y) => x);

var example = source
                .map(x => x.toUpperCase())
                .catch(error => Rx.Observable.of('h'));

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

JSBin | JSFiddle

这个范例我们每隔500毫秒会送出一个字串(String),并用字串的方法toUpperCase()来把字串的英文字母改成大写,过程中可能未知的原因送出了一个数值(Number)2导致发生例外(数值没有toUpperCase的方法),这时我们在后面接的catch就能抓到错误。

catch 可以回传一个新的Observable、Promise、Array 或任何Iterable 的物件,来传送之后的元素。

以我们的例子来说最后就会在送出X就结束,画成Marble Diagram如下

source : ----a----b----c----d----2|
        map(x => x.toUpperCase())
         ----a----b----c----d----X|
        catch(error => Rx.Observable.of('h'))
example: ----a----b----c----d----h|

这里可以看到,当错误发生后就会进到catch 并重新处理一个新的observable,我们可以利用这个新的observable 来送出我们想送的值。

也可以在遇到错误后,让observable 结束,如下

var source = Rx.Observable.from(['a','b','c','d',2])
            .zip(Rx.Observable.interval(500), (x,y) => x);

var example = source
                .map(x => x.toUpperCase())
                .catch(error => Rx.Observable.empty());

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

JSBin | JSFiddle

回传一个empty 的observable 来直接结束(complete)。

另外catch 的callback 能接收第二个参数,这个参数会接收当前的observalbe,我们可以回传当前的observable 来做到重新执行,范例如下

var source = Rx.Observable.from(['a','b','c','d',2])
            .zip(Rx.Observable.interval(500), (x,y) => x);

var example = source
                .map(x => x.toUpperCase())
                .catch((error, obs) => obs);

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

JSBin | JSFiddle

这里可以看到我们直接回传了当前的obserable(其实就是example)来重新执行,画成Marble Diagram 如下

source : ----a----b----c----d----2|
        map(x => x.toUpperCase())
         ----a----b----c----d----X|
        catch((error, obs) => obs)
example: ----a----b----c----d--------a----b----c----d--..

因为是我们只是简单的示范,所以这里会一直无限循环,实务上通常会用在断线重连的情境。

另上面的处理方式有一个简化的写法,叫做retry()

retry

如果我们想要一个observable 发生错误时,重新尝试就可以用retry 这个方法,跟我们前一个讲范例的行为是一致

var source = Rx.Observable.from(['a','b','c','d',2])
            .zip(Rx.Observable.interval(500), (x,y) => x);

var example = source
                .map(x => x.toUpperCase())
                .retry();

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

JSBin | JSFiddle

通常这种无限的retry会放在即时同步的重新连接,让我们在连线断掉后,不断的尝试。另外我们也可以设定只尝试几次,如下

var source = Rx.Observable.from(['a','b','c','d',2])
            .zip(Rx.Observable.interval(500), (x,y) => x);

var example = source
                .map(x => x.toUpperCase())
                .retry(1);

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
}); 
// a
// b
// c
// d
// a
// b
// c
// d
// Error: TypeError: x.toUpperCase is not a function

JSBin | JSFiddle

这里我们对retry传入一个数值1,能够让我们只重复尝试1次后送出错误,画成Marble Diagram如下

source : ----a----b----c----d----2|
        map(x => x.toUpperCase())
         ----a----b----c----d----X|
                retry(1)
example: ----a----b----c----d--------a----b----c----d----X|

这种处理方式很适合用在HTTP request 失败的场景中,我们可以设定重新发送几次后,再秀出错误讯息。

retryWhen

RxJS还提供了另一种方法retryWhen,他可以把例外发生的元素放到一个observable中,让我们可以直接操作这个observable,并等到这个observable操作完后再重新订阅一次原本的observable。

这里我们直接来看程式码

var source = Rx.Observable.from(['a','b','c','d',2])
            .zip(Rx.Observable.interval(500), (x,y) => x);

var example = source
                .map(x => x.toUpperCase())
                .retryWhen(errorObs => errorObs.delay(1000));

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

JSBin | JSFiddle

这里retryWhen 我们传入一个callback,这个callback 有一个参数会传入一个observable,这个observable 不是原本的observable(example) 而是例外事件送出的错误所组成的一个observable,我们可以对这个由错误所组成的observable 做操作,等到这次的处理完成后就会重新订阅我们原本的observable。

这个范例我们是把错误的observable 送出错误延迟1 秒,这会使后面重新订阅的动作延迟1 秒才执行,画成Marble Diagram 如下

source : ----a----b----c----d----2|
        map(x => x.toUpperCase())
         ----a----b----c----d----X|
        retryWhen(errorObs => errorObs.delay(1000))
example: ----a----b----c----d-------------------a----b----c----d----...

从上图可以看到后续重新订阅的行为就被延后了,但实务上我们不太会用retryWhen 来做重新订阅的延迟,通常是直接用catch 做到这件事。这里只是为了示范retryWhen 的行为,实务上我们通常会把retryWhen 拿来做错误通知或是例外收集,如下

var source = Rx.Observable.from(['a','b','c','d',2])
            .zip(Rx.Observable.interval(500), (x,y) => x);

var example = source
                .map(x => x.toUpperCase())
                .retryWhen(
                errorObs => errorObs.map(err => fetch('...')));

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

这里的errorObs.map(err =>fetch('...'))可以把errorObs里的每个错误变成API的发送,通常这里个API会像是送讯息到公司的通讯频道(Slack等等),这样可以让工程师马上知道可能哪个API挂了,这样我们就能即时地处理。

retryWhen 实际上是在背地里建立一个Subject 并把错误放入,会在对这个Subject 进行内部的订阅,因为我们还没有讲到Subject 的观念,大家可以先把它当作Observable 就好了,另外记得这个observalbe 预设是无限的,如果我们把它结束,原本的observable 也会跟着结束。

repeat

我们有时候可能会想要retry 一直重复订阅的效果,但没有错误发生,这时就可以用repeat 来做到这件事,范例如下

var source = Rx.Observable.from(['a','b','c'])
            .zip(Rx.Observable.interval(500), (x,y) => x);

var example = source.repeat(1);

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

// a
// b
// c
// a
// b
// c
// complete

JSBin | JSFiddle

这里repeat 的行为跟retry 基本一致,只是retry 只有在例外发生时才触发,画成Marble Diagram 如下

source : ----a----b----c|
            repeat(1)
example: ----a----b----c----a----b----c|

同样的我们可以不给参数让他无限循环,如下

var source = Rx.Observable.from(['a','b','c'])
            .zip(Rx.Observable.interval(500), (x,y) => x);

var example = source.repeat();

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

JSBin | JSFiddle

这样我们就可以做动不断重复的行为,这个可以在建立轮询时使用,让我们不断地发request 来更新画面。

最后我们来看一个错误处理在实务应用中的小范例

const title = document.getElementById('title');

var source = Rx.Observable.from(['a','b','c','d',2])
            .zip(Rx.Observable.interval(500), (x,y) => x)
            .map(x => x.toUpperCase()); 
            // 通常 source 會是建立即時同步的連線,像是 web socket

var example = source.catch(
                (error, obs) => Rx.Observable.empty()
                               .startWith('連線發生錯誤: 5秒後重連')
                               .concat(obs.delay(5000))
                 );

example.subscribe({
    next: (value) => { title.innerText = value },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

JSBin | JSFiddle

这个范例其实就是模仿在即时同步断线时,利用catch 返回一个新的observable,这个observable 会先送出错误讯息并且把原本的observable 延迟5 秒再做合并,虽然这只是一个模仿,但它清楚的展示了RxJS 在做错误处理时的灵活性。

今日小结

今天我们讲了三个错误处理的方法还有一个repeat operator,这几个方法都很有机会在实务上用到,不知道今天大家有没有收获呢?如果有任何问题,欢迎在下方留言给我,谢谢!

results matching ""

    No results matching ""