RxJS Operators

什麼是 Operators?

Operators(操作符)是 RxJS 中的核心功能,它們是可以對 Observable 進行各種轉換、過濾、組合或其他操作的函數。通過操作符,我們可以以聲明式的方式處理 Observable 發射的資料流。

使用 pipe() 方法

RxJS 提供了 .pipe() 方法來組合和鏈接多個操作符:

import { interval } from 'rxjs';
import { map } from 'rxjs/operators';  // 較舊版本的 RxJS

// 或在 RxJS 7+ 中可以直接從 rxjs 導入
// import { interval, map } from 'rxjs';

interval(1000)
  .pipe(
    map((val) => val * 2)
  )
  .subscribe({
    next: (val) => console.log(val),  // 輸出: 0, 2, 4, 6, 8...
  });

操作符分類

RxJS 有很多操作符,大致可以分為以下幾類:

1. 創建操作符

用於創建 Observable 的操作符:

import { of, from, interval, timer, fromEvent } from 'rxjs';

// 從固定值創建 Observable
of(1, 2, 3).subscribe(val => console.log(val));  // 輸出: 1, 2, 3

// 從陣列或類陣列物件創建 Observable
from([4, 5, 6]).subscribe(val => console.log(val));  // 輸出: 4, 5, 6

// 創建定時發射的 Observable
interval(1000).subscribe(val => console.log(val));  // 每秒輸出: 0, 1, 2...

// 延遲指定時間後發射值
timer(2000, 1000).subscribe(val => console.log(val));  // 2秒後開始,之後每秒: 0, 1, 2...

// 從事件創建 Observable
const button = document.querySelector('button');
fromEvent(button, 'click').subscribe(() => console.log('按鈕被點擊了!'));

2. 轉換操作符

用於轉換 Observable 發射的值:

import { interval } from 'rxjs';
import { map, scan, buffer, concatMap, mergeMap, switchMap } from 'rxjs/operators';

// map:轉換每個值
interval(1000)
  .pipe(map(val => val * 10))
  .subscribe(val => console.log(val));  // 輸出: 0, 10, 20, 30...

// scan:累積值(類似 reduce,但發射中間結果)
interval(1000)
  .pipe(scan((acc, val) => acc + val, 0))
  .subscribe(val => console.log(val));  // 輸出: 0, 1, 3, 6, 10...

// 各種 flattening 操作符 (concatMap, mergeMap, switchMap) 用於處理高階 Observable

3. 過濾操作符

用於過濾 Observable 發射的值:

import { interval } from 'rxjs';
import { filter, take, skip, distinct, debounceTime, throttleTime } from 'rxjs/operators';

// filter:只保留符合條件的值
interval(1000)
  .pipe(filter(val => val % 2 === 0))
  .subscribe(val => console.log(val));  // 輸出: 0, 2, 4, 6...

// take:只取前N個值
interval(1000)
  .pipe(take(5))
  .subscribe(val => console.log(val));  // 輸出: 0, 1, 2, 3, 4 然後完成

// skip:跳過前N個值
interval(1000)
  .pipe(skip(2))
  .subscribe(val => console.log(val));  // 輸出: 2, 3, 4...(跳過前兩個)

// debounceTime:在指定時間內無新值時才發射最新值(常用於搜尋框)
fromEvent(searchInput, 'input')
  .pipe(
    map(event => event.target.value),
    debounceTime(300)
  )
  .subscribe(val => console.log(`搜索: ${val}`));

4. 組合操作符

用於組合多個 Observable:

import { interval, merge, combineLatest, concat, forkJoin, zip } from 'rxjs';
import { startWith } from 'rxjs/operators';

const first$ = interval(1000).pipe(map(v => `First: ${v}`));
const second$ = interval(1500).pipe(map(v => `Second: ${v}`));

// merge:合併多個 Observable 的輸出
merge(first$, second$).subscribe(val => console.log(val));

// combineLatest:當任一源 Observable 發射值時,發射所有源的最新值
combineLatest([first$, second$])
  .subscribe(([first, second]) => console.log(`組合: ${first}, ${second}`));

// startWith:在源 Observable 前添加一個初始值
interval(1000)
  .pipe(startWith(-1))
  .subscribe(val => console.log(val));  // 輸出: -1, 0, 1, 2...

5. 錯誤處理操作符

用於處理 Observable 中的錯誤:

import { of, throwError } from 'rxjs';
import { catchError, retry, retryWhen, timeout } from 'rxjs/operators';

// catchError:捕獲錯誤並提供恢復策略
fetchData().pipe(
  catchError(error => {
    console.log('發生錯誤:', error);
    return of([]);  // 返回空陣列作為後備
  })
).subscribe(data => console.log(data));

// retry:在發生錯誤時重試
fetchData().pipe(
  retry(3)  // 最多重試3次
).subscribe(data => console.log(data));

實用的操作符組合

在實際應用中,我們通常會組合多個操作符來處理更複雜的邏輯:

表單輸入防抖與過濾

import { fromEvent } from 'rxjs';
import { debounceTime, distinctUntilChanged, filter, map } from 'rxjs/operators';

const searchInput = document.getElementById('search');

fromEvent(searchInput, 'input').pipe(
  map((event: Event) => (event.target as HTMLInputElement).value),
  filter(text => text.length > 2),  // 只有超過2個字符才處理
  debounceTime(300),  // 等待用戶停止輸入300ms
  distinctUntilChanged()  // 只在值變化時發射
).subscribe(value => {
  console.log(`搜索: ${value}`);
  // 執行搜索邏輯...
});

HTTP請求與重試

import { HttpClient } from '@angular/common/http';
import { catchError, retry, switchMap } from 'rxjs/operators';
import { throwError, timer } from 'rxjs';

// 指數退避重試策略
function exponentialBackoffRetry(maxRetries = 3, initialDelay = 1000) {
  return (errors) => {
    return errors.pipe(
      switchMap((error, index) => {
        if (index >= maxRetries) {
          return throwError(() => error);
        }
        const delay = initialDelay * Math.pow(2, index);
        console.log(`重試 #${index + 1} 將在 ${delay}ms 後進行`);
        return timer(delay);
      })
    );
  };
}

this.http.get('api/data').pipe(
  retryWhen(exponentialBackoffRetry(3, 1000)),
  catchError(error => {
    console.error('所有重試都失敗了:', error);
    return throwError(() => new Error('無法取得資料'));
  })
).subscribe({
  next: data => console.log('資料:', data),
  error: err => console.error('發生錯誤:', err)
});

建立自定義操作符

當內建操作符不足以滿足你的需求時,你可以創建自定義操作符:

import { Observable } from 'rxjs';
import { tap } from 'rxjs/operators';

// 創建一個簡單的記錄操作符
function debug(prefix: string) {
  return <T>(source: Observable<T>) => {
    return source.pipe(
      tap({
        next: value => console.log(`${prefix} Next:`, value),
        error: error => console.log(`${prefix} Error:`, error),
        complete: () => console.log(`${prefix} Complete`)
      })
    );
  };
}

// 使用自定義操作符
interval(1000).pipe(
  debug('計時器'),
  map(val => val * 2),
  debug('轉換後')
).subscribe();

最佳實踐

  1. 使用適當的操作符:了解每個操作符的作用,選擇最適合你需求的

  2. 避免巢狀訂閱:使用 flattening 操作符(switchMap, mergeMap等)而不是嵌套 subscribe

  3. 處理錯誤:始終處理潛在的錯誤情況

  4. 及時清理訂閱:使用 takeUntil 或 takeUntilDestroyed 等操作符管理訂閱生命週期

  5. 遵循函數式編程原則:盡量讓你的操作是純函數,避免副作用

// 推薦寫法:使用 pipe 和適當的操作符
source$.pipe(
  map(data => processData(data)),
  filter(result => result !== null),
  catchError(error => handleError(error))
).subscribe(result => displayResult(result));

// 避免這種寫法:巢狀訂閱
source$.subscribe(data => {
  const processed = processData(data);
  if (processed !== null) {
    displayResult(processed);
  }
});

Last updated