自定義 Observable
Observable 是 RxJS 的核心概念,代表一個可以隨時間發出多個值的資料流。除了使用 RxJS 提供的操作符建立 Observable 外,我們也可以自己建立自定義的 Observable。
建立基本的自定義 Observable
從 rxjs 引入 Observable,並 new 一個實例。Observable 建構式接收一個函數,這個函數會接收一個 subscriber 參數,我們可以透過它來發出值、完成 Observable 或發出錯誤。
import { Observable } from 'rxjs';
customInterval$ = new Observable((subscriber) => {
setInterval(() => {
console.log('Emmitting new value...');
subscriber.next({ message: 'New Value' });
}, 2000);
});
ngOnInit(): void {
this.customInterval$.subscribe({
next: (val) => console.log(val),
});
}
在這個例子中,我們建立了一個每 2 秒發出一個物件的 Observable。透過 subscriber.next()
,我們可以發出新的值給訂閱者。

完成 Observable 流
Observable 可以無限發出值,但在實際應用中,我們通常希望在特定條件下結束資料流。可以透過 subscriber.complete()
來完成 Observable:
customInterval$ = new Observable((subscriber) => {
let timesExcecuted = 0;
const intervalId = setInterval(() => {
if (timesExcecuted > 3) {
clearInterval(intervalId);
subscriber.complete(); // 發出完成事件,自動清理訂閱
return;
}
console.log('Emmitting new value...');
subscriber.next({ message: 'New Value' });
timesExcecuted++;
}, 2000);
});
ngOnInit(): void {
this.customInterval$.subscribe({
next: (val) => console.log(val),
complete: () => console.log('COMPLETED'),
});
}
在這個例子中,Observable 會在發出 4 次值後自動完成。當調用 subscriber.complete()
時,Observable 會通知訂閱者資料流已結束,並自動清理訂閱。

錯誤處理
自定義 Observable 中也可以處理錯誤情況:
customErrorInterval$ = new Observable((subscriber) => {
let count = 0;
const intervalId = setInterval(() => {
if (count === 3) {
subscriber.error(new Error('計數超過 3 次'));
clearInterval(intervalId);
return;
}
subscriber.next(count);
count++;
}, 1000);
});
ngOnInit(): void {
this.customErrorInterval$.subscribe({
next: (val) => console.log(`值: ${val}`),
error: (err) => console.error(`錯誤: ${err.message}`),
complete: () => console.log('完成')
});
}
通過 subscriber.error()
,我們可以在需要時發出錯誤,訂閱者可以在 error
回調中處理這些錯誤。
資源清理
自定義 Observable 時,需要注意正確清理資源以避免記憶體洩漏:
customInterval$ = new Observable((subscriber) => {
console.log('Observable 執行中...');
const intervalId = setInterval(() => {
subscriber.next(new Date().toISOString());
}, 1000);
// 返回清理函數,當 Observable 被取消訂閱或完成時會執行
return () => {
console.log('清理資源...');
clearInterval(intervalId);
};
});
ngOnInit(): void {
const subscription = this.customInterval$.subscribe({
next: (val) => console.log(val)
});
// 5 秒後取消訂閱
setTimeout(() => {
subscription.unsubscribe();
console.log('已取消訂閱');
}, 5000);
}
在建立 Observable 時返回一個清理函數,這個函數會在 Observable 被取消訂閱時自動執行,用來清理資源。
使用場景
自定義 Observable 適用於以下情況:
整合非 RxJS 的非同步 API (如 WebSocket、Geolocation API)
將複雜的事件流程轉換為可組合的 Observable
定義特殊的反應式行為
封裝業務邏輯為可重用的資料流
注意事項
Observable 是冷啟動的,只有被訂閱時才會執行
每個訂閱者都會觸發一次 Observable 函數的執行
請記得適當取消訂閱,特別是在元件銷毀時
返回清理函數是良好實踐,確保資源正確釋放
Last updated