建立Observable(一)
Observable 是RxJS 的核心,今天让我们从如何建立Observable 开始!
不想看文章的人,可以直接看影片喔!
今天大家看文章一定要分清楚Observable跟Observer,不要搞混。
前几天我们把所有重要的观念及前置的知识都讲完了,今天要正式进入RxJS的应用,整个RxJS说白了就是一个核心三个重点。
一个核心是Observable 再加上相关的Operators(map, filter...),这个部份是最重要的,其他三个重点本质上也是围绕着这个核心在转,所以我们会花将近20 天的篇数讲这个部份的观念及使用案例。
另外三个重点分别是
- Observer
- Subject
- Schedulers
Observer 是这三个当中一定会用到却是最简单的,所以我们今天就会把它介绍完。
Subject 一般应用到的频率就相对低很多,但如果想要看懂RxJS 相关的Library 或Framework,Subject 就是一定要会的重点,所以这个部份我们大概会花3-5 天的时间讲解。
至于Schedulers 则是要解决RxJS 衍伸出的最后一道问题,这个部份会视情况加入或是在30 天后补完。
redux-observable就是用了Subject实作的
让我卖个关子,先不说RxJS 最后一道问题是什么。
说了这么多,我们赶快进入到今天的主题Observable 吧!
建立Observable:create
建立Observable的方法有非常多种,其中create
是最基本的方法。create
方法在Rx.Observable
物件中,要传入一个callback function ,这个callback function会接收一个observer参数,如下
var observable = Rx.Observable
.create(function(observer) {
observer.next('Jerry'); // RxJS 4.x 以前的版本用 onNext
observer.next('Anna');
})
这个callback function 会定义observable 将会如何发送值。
虽然Observable可以被
create
,但实务上我们通常都使用creation operator像是from, of, fromEvent, fromPromise等。这里只是为了从基本的开始讲解所以才用create
我们可以订阅这个observable,来接收他送出的值,程式码如下
var observable = Rx.Observable
.create(function(observer) {
observer.next('Jerry'); // RxJS 4.x 以前的版本用 onNext
observer.next('Anna');
})
// 订阅这个 observable
observable.subscribe(function(value) {
console.log(value);
})
当我们订阅这个observable,他就会依序送出'Jerry' 'Anna'
两个字串。
订阅Observable 跟addEventListener 在实作上其实有非常大的不同。虽然在行为上很像,但实际上Observable 根本没有管理一个订阅的清单,这个部份的细节我们留到最后说明!
这里有一个重点,很多人认为RxJS 是在做非同步处理,所以所有行为都是非同步的。但其实这个观念是错的,RxJS 确实主要在处理非同步行为没错,但也同时能处理同步行为,像是上面的程式码就是同步执行的。
证明如下
var observable = Rx.Observable
.create(function(observer) {
observer.next('Jerry'); // RxJS 4.x 以前的版本用 onNext
observer.next('Anna');
})
console.log('start');
observable.subscribe(function(value) {
console.log(value);
});
console.log('end');
上面这段程式码会印出
start
Jerry
Anna
end
而不是
start
end
Jerry
Anna
所以很明显的这段程式码是同步执行的,当然我们可以拿它来处理非同步的行为!
var observable = Rx.Observable
.create(function(observer) {
observer.next('Jerry'); // RxJS 4.x 以前的版本用 onNext
observer.next('Anna');
setTimeout(() => {
observer.next('RxJS 30 days!');
}, 30)
})
console.log('start');
observable.subscribe(function(value) {
console.log(value);
});
console.log('end');
这时就会印出
start
Jerry
Anna
end
RxJS 30 days!
从上述的程式码能看得出来
Observable 同时可以处理同步与非同步的行为!
观察者Observer
Observable 可以被订阅(subscribe),或说可以被观察,而订阅Observable 的物件又称为**观察者(Observer)**。观察者是一个具有三个方法(method)的物件,每当Observable 发生事件时,便会呼叫观察者相对应的方法。
注意这里的观察者(Observer)跟上一篇讲的观察者模式(Observer Pattern)无关,观察者模式是一种设计模式,是思考问题的解决过程,而这里讲的观察者是一个被定义的物件。
观察者的三个方法(method):
next:每当Observable 发送出新的值,next 方法就会被呼叫。
complete:在Observable 没有其他的资料可以取得时,complete 方法就会被呼叫,在complete 被呼叫之后,next 方法就不会再起作用。
error:每当Observable 内发生错误时,error 方法就会被呼叫。
说了这么多,我们还是直接来建立一个观察者吧!
var observable = Rx.Observable
.create(function(observer) {
observer.next('Jerry');
observer.next('Anna');
observer.complete();
observer.next('not work');
})
// 宣告一个观察者,具备 next, error, complete 三个方法
var observer = {
next: function(value) {
console.log(value);
},
error: function(error) {
console.log(error)
},
complete: function() {
console.log('complete')
}
}
// 用我们定义好的观察者,来订阅这个 observable
observable.subscribe(observer)
上面这段程式码会印出
Jerry
Anna
complete
上面的范例可以看得出来在complete执行后,next就会自动失效,所以没有印出not work
。
下面则是送出错误的范例
var observable = Rx.Observable
.create(function(observer) {
try {
observer.next('Jerry');
observer.next('Anna');
throw 'some exception';
} catch(e) {
observer.error(e)
}
});
// 宣告一个观察者,具备 next, error, complete 三个方法
var observer = {
next: function(value) {
console.log(value);
},
error: function(error) {
console.log('Error: ', error)
},
complete: function() {
console.log('complete')
}
}
// 用我们定义好的观察者,来订阅这个 observable
observable.subscribe(observer)
这里就会执行error的function印出Error: some exception
。
另外观察者可以是不完整的,他可以只具有一个next 方法,如下
var observer = {
next: function(value) {
//...
}
}
有时候Observable会是一个无限的序列,例如click事件,这时
complete
方法就有可能永远不会被呼叫!
我们也可以直接把next, error, complete三个function依序传入observable.subscribe
,如下:
observable.subscribe(
value => { console.log(value); },
error => { console.log('Error: ', error); },
() => { console.log('complete') }
)
observable.subscribe
会在内部自动组成observer 物件来操作。
实作细节
我们前面提到了,其实Observable 的订阅跟addEventListener 在实作上有蛮大的差异,虽然他们的行为很像!
addEventListener 本质上就是Observer Pattern 的实作,在内部会有一份订阅清单,像是我们昨天实作的Producerclass Producer {
constructor() {
this.listeners = [];
}
addListener(listener) {
if(typeof listener === 'function') {
this.listeners.push(listener)
} else {
throw new Error('listener 必須是 function')
}
}
removeListener(listener) {
this.listeners.splice(this.listeners.indexOf(listener), 1)
}
notify(message) {
this.listeners.forEach(listener => {
listener(message);
})
}
}
我们在内部储存了一份所有的监听者清单(this.listeners
),在要发布通知时会对逐一的呼叫这份清单的监听者。
但在Observable 不是这样实作的,在其内部并没有一份订阅者的清单。订阅Observable 的行为比较像是执行一个物件的方法,并把资料传进这个方法中。
我们以下面的程式码做说明
var observable = Rx.Observable
.create(function (observer) {
observer.next('Jerry');
observer.next('Anna');
})
observable.subscribe({
next: function(value) {
console.log(value);
},
error: function(error) {
console.log(error)
},
complete: function() {
console.log('complete')
}
})
像上面这段程式,他的行为比较像这样
function subscribe(observer) {
observer.next('Jerry');
observer.next('Anna');
}
subscribe({
next: function(value) {
console.log(value);
},
error: function(error) {
console.log(error)
},
complete: function() {
console.log('complete')
}
});
这里可以看到subscribe 是一个function,这个function 执行时会传入观察者,而我们在这个function 内部去执行观察者的方法。
订阅一个Observable 就像是执行一个function
今日小结
今天在讲关于建立Observable的实例,用到了create
的方法,但大部分的内容还是在讲Observable几个重要的观念,如下
Observable可以同时处理同步跟非同步行为
Observer是一个物件,这个物件具有三个方法,分别是next , error , complete
订阅一个Observable 就像在执行一个function
不知道读者是否有所收获,如果有任何问题或建议,欢迎在下方留言给我,谢谢。