Observable Operators - catch, retry, retryWhen, repeat
我们已经快把所有基本的转换(Transformation)、过滤(Filter)和合并(Combination)的operators 讲完了。今天要讲错误处理(Error Handling)的operators,错误处理是非同步行为中的一大难题,尤其有多个交错的非同步行为时,更容易凸显错误处理的困难。
就让我们一起来看看在RxJS 中能如何处理错误吧!
Operators
catch
catch 是很常见的非同步错误处理方法,在RxJS 中也能够直接用catch 来处理错误,在RxJS 中的catch 可以回传一个observable 来送出新的值,让我们直接来看范例:
var source = Rx.Observable.from(['a','b','c','d',2])
.zip(Rx.Observable.interval(500), (x,y) => x);
var example = source
.map(x => x.toUpperCase())
.catch(error => Rx.Observable.of('h'));
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
这个范例我们每隔500毫秒会送出一个字串(String),并用字串的方法toUpperCase()
来把字串的英文字母改成大写,过程中可能未知的原因送出了一个数值(Number)2
导致发生例外(数值没有toUpperCase的方法),这时我们在后面接的catch就能抓到错误。
catch 可以回传一个新的Observable、Promise、Array 或任何Iterable 的物件,来传送之后的元素。
以我们的例子来说最后就会在送出X
就结束,画成Marble Diagram如下
source : ----a----b----c----d----2|
map(x => x.toUpperCase())
----a----b----c----d----X|
catch(error => Rx.Observable.of('h'))
example: ----a----b----c----d----h|
这里可以看到,当错误发生后就会进到catch 并重新处理一个新的observable,我们可以利用这个新的observable 来送出我们想送的值。
也可以在遇到错误后,让observable 结束,如下
var source = Rx.Observable.from(['a','b','c','d',2])
.zip(Rx.Observable.interval(500), (x,y) => x);
var example = source
.map(x => x.toUpperCase())
.catch(error => Rx.Observable.empty());
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
回传一个empty 的observable 来直接结束(complete)。
另外catch 的callback 能接收第二个参数,这个参数会接收当前的observalbe,我们可以回传当前的observable 来做到重新执行,范例如下
var source = Rx.Observable.from(['a','b','c','d',2])
.zip(Rx.Observable.interval(500), (x,y) => x);
var example = source
.map(x => x.toUpperCase())
.catch((error, obs) => obs);
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
这里可以看到我们直接回传了当前的obserable(其实就是example)来重新执行,画成Marble Diagram 如下
source : ----a----b----c----d----2|
map(x => x.toUpperCase())
----a----b----c----d----X|
catch((error, obs) => obs)
example: ----a----b----c----d--------a----b----c----d--..
因为是我们只是简单的示范,所以这里会一直无限循环,实务上通常会用在断线重连的情境。
另上面的处理方式有一个简化的写法,叫做retry()
。
retry
如果我们想要一个observable 发生错误时,重新尝试就可以用retry 这个方法,跟我们前一个讲范例的行为是一致
var source = Rx.Observable.from(['a','b','c','d',2])
.zip(Rx.Observable.interval(500), (x,y) => x);
var example = source
.map(x => x.toUpperCase())
.retry();
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
通常这种无限的retry
会放在即时同步的重新连接,让我们在连线断掉后,不断的尝试。另外我们也可以设定只尝试几次,如下
var source = Rx.Observable.from(['a','b','c','d',2])
.zip(Rx.Observable.interval(500), (x,y) => x);
var example = source
.map(x => x.toUpperCase())
.retry(1);
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
// a
// b
// c
// d
// a
// b
// c
// d
// Error: TypeError: x.toUpperCase is not a function
这里我们对retry传入一个数值1
,能够让我们只重复尝试1次后送出错误,画成Marble Diagram如下
source : ----a----b----c----d----2|
map(x => x.toUpperCase())
----a----b----c----d----X|
retry(1)
example: ----a----b----c----d--------a----b----c----d----X|
这种处理方式很适合用在HTTP request 失败的场景中,我们可以设定重新发送几次后,再秀出错误讯息。
retryWhen
RxJS还提供了另一种方法retryWhen
,他可以把例外发生的元素放到一个observable中,让我们可以直接操作这个observable,并等到这个observable操作完后再重新订阅一次原本的observable。
这里我们直接来看程式码
var source = Rx.Observable.from(['a','b','c','d',2])
.zip(Rx.Observable.interval(500), (x,y) => x);
var example = source
.map(x => x.toUpperCase())
.retryWhen(errorObs => errorObs.delay(1000));
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
这里retryWhen 我们传入一个callback,这个callback 有一个参数会传入一个observable,这个observable 不是原本的observable(example) 而是例外事件送出的错误所组成的一个observable,我们可以对这个由错误所组成的observable 做操作,等到这次的处理完成后就会重新订阅我们原本的observable。
这个范例我们是把错误的observable 送出错误延迟1 秒,这会使后面重新订阅的动作延迟1 秒才执行,画成Marble Diagram 如下
source : ----a----b----c----d----2|
map(x => x.toUpperCase())
----a----b----c----d----X|
retryWhen(errorObs => errorObs.delay(1000))
example: ----a----b----c----d-------------------a----b----c----d----...
从上图可以看到后续重新订阅的行为就被延后了,但实务上我们不太会用retryWhen 来做重新订阅的延迟,通常是直接用catch 做到这件事。这里只是为了示范retryWhen 的行为,实务上我们通常会把retryWhen 拿来做错误通知或是例外收集,如下
var source = Rx.Observable.from(['a','b','c','d',2])
.zip(Rx.Observable.interval(500), (x,y) => x);
var example = source
.map(x => x.toUpperCase())
.retryWhen(
errorObs => errorObs.map(err => fetch('...')));
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
这里的errorObs.map(err =>fetch('...'))
可以把errorObs里的每个错误变成API的发送,通常这里个API会像是送讯息到公司的通讯频道(Slack等等),这样可以让工程师马上知道可能哪个API挂了,这样我们就能即时地处理。
retryWhen 实际上是在背地里建立一个Subject 并把错误放入,会在对这个Subject 进行内部的订阅,因为我们还没有讲到Subject 的观念,大家可以先把它当作Observable 就好了,另外记得这个observalbe 预设是无限的,如果我们把它结束,原本的observable 也会跟着结束。
repeat
我们有时候可能会想要retry 一直重复订阅的效果,但没有错误发生,这时就可以用repeat 来做到这件事,范例如下
var source = Rx.Observable.from(['a','b','c'])
.zip(Rx.Observable.interval(500), (x,y) => x);
var example = source.repeat(1);
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
// a
// b
// c
// a
// b
// c
// complete
这里repeat 的行为跟retry 基本一致,只是retry 只有在例外发生时才触发,画成Marble Diagram 如下
source : ----a----b----c|
repeat(1)
example: ----a----b----c----a----b----c|
同样的我们可以不给参数让他无限循环,如下
var source = Rx.Observable.from(['a','b','c'])
.zip(Rx.Observable.interval(500), (x,y) => x);
var example = source.repeat();
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
这样我们就可以做动不断重复的行为,这个可以在建立轮询时使用,让我们不断地发request 来更新画面。
最后我们来看一个错误处理在实务应用中的小范例
const title = document.getElementById('title');
var source = Rx.Observable.from(['a','b','c','d',2])
.zip(Rx.Observable.interval(500), (x,y) => x)
.map(x => x.toUpperCase());
// 通常 source 會是建立即時同步的連線,像是 web socket
var example = source.catch(
(error, obs) => Rx.Observable.empty()
.startWith('連線發生錯誤: 5秒後重連')
.concat(obs.delay(5000))
);
example.subscribe({
next: (value) => { title.innerText = value },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
这个范例其实就是模仿在即时同步断线时,利用catch 返回一个新的observable,这个observable 会先送出错误讯息并且把原本的observable 延迟5 秒再做合并,虽然这只是一个模仿,但它清楚的展示了RxJS 在做错误处理时的灵活性。
今日小结
今天我们讲了三个错误处理的方法还有一个repeat operator,这几个方法都很有机会在实务上用到,不知道今天大家有没有收获呢?如果有任何问题,欢迎在下方留言给我,谢谢!