自定義 Observable

Observable 是 RxJS 的核心概念,代表一個可以隨時間發出多個值的資料流。除了使用 RxJS 提供的操作符建立 Observable 外,我們也可以自己建立自定義的 Observable。

建立基本的自定義 Observable

從 rxjs 引入 Observable,並 new 一個實例。Observable 建構式接收一個函數,這個函數會接收一個 subscriber 參數,我們可以透過它來發出值、完成 Observable 或發出錯誤。

import { Observable } from 'rxjs';

customInterval$ = new Observable((subscriber) => {
  setInterval(() => {
    console.log('Emmitting new value...');
    subscriber.next({ message: 'New Value' });
  }, 2000);
});

ngOnInit(): void {
  this.customInterval$.subscribe({
    next: (val) => console.log(val),
  });
}

在這個例子中,我們建立了一個每 2 秒發出一個物件的 Observable。透過 subscriber.next(),我們可以發出新的值給訂閱者。

完成 Observable 流

Observable 可以無限發出值,但在實際應用中,我們通常希望在特定條件下結束資料流。可以透過 subscriber.complete() 來完成 Observable:

customInterval$ = new Observable((subscriber) => {
  let timesExcecuted = 0;
  const intervalId = setInterval(() => {
    if (timesExcecuted > 3) {
      clearInterval(intervalId);
      subscriber.complete(); // 發出完成事件,自動清理訂閱
      return;
    }
    console.log('Emmitting new value...');
    subscriber.next({ message: 'New Value' });
    timesExcecuted++;
  }, 2000);
});

ngOnInit(): void {
  this.customInterval$.subscribe({
    next: (val) => console.log(val),
    complete: () => console.log('COMPLETED'),
  });
}

在這個例子中,Observable 會在發出 4 次值後自動完成。當調用 subscriber.complete() 時,Observable 會通知訂閱者資料流已結束,並自動清理訂閱。

錯誤處理

自定義 Observable 中也可以處理錯誤情況:

customErrorInterval$ = new Observable((subscriber) => {
  let count = 0;
  const intervalId = setInterval(() => {
    if (count === 3) {
      subscriber.error(new Error('計數超過 3 次'));
      clearInterval(intervalId);
      return;
    }
    subscriber.next(count);
    count++;
  }, 1000);
});

ngOnInit(): void {
  this.customErrorInterval$.subscribe({
    next: (val) => console.log(`值: ${val}`),
    error: (err) => console.error(`錯誤: ${err.message}`),
    complete: () => console.log('完成')
  });
}

通過 subscriber.error(),我們可以在需要時發出錯誤,訂閱者可以在 error 回調中處理這些錯誤。

資源清理

自定義 Observable 時,需要注意正確清理資源以避免記憶體洩漏:

customInterval$ = new Observable((subscriber) => {
  console.log('Observable 執行中...');
  const intervalId = setInterval(() => {
    subscriber.next(new Date().toISOString());
  }, 1000);
  
  // 返回清理函數,當 Observable 被取消訂閱或完成時會執行
  return () => {
    console.log('清理資源...');
    clearInterval(intervalId);
  };
});

ngOnInit(): void {
  const subscription = this.customInterval$.subscribe({
    next: (val) => console.log(val)
  });
  
  // 5 秒後取消訂閱
  setTimeout(() => {
    subscription.unsubscribe();
    console.log('已取消訂閱');
  }, 5000);
}

在建立 Observable 時返回一個清理函數,這個函數會在 Observable 被取消訂閱時自動執行,用來清理資源。

使用場景

自定義 Observable 適用於以下情況:

  1. 整合非 RxJS 的非同步 API (如 WebSocket、Geolocation API)

  2. 將複雜的事件流程轉換為可組合的 Observable

  3. 定義特殊的反應式行為

  4. 封裝業務邏輯為可重用的資料流

注意事項

  • Observable 是冷啟動的,只有被訂閱時才會執行

  • 每個訂閱者都會觸發一次 Observable 函數的執行

  • 請記得適當取消訂閱,特別是在元件銷毀時

  • 返回清理函數是良好實踐,確保資源正確釋放

Last updated