RxJS Operators
什麼是 Operators?
Operators(操作符)是 RxJS 中的核心功能,它們是可以對 Observable 進行各種轉換、過濾、組合或其他操作的函數。通過操作符,我們可以以聲明式的方式處理 Observable 發射的資料流。
使用 pipe() 方法
RxJS 提供了 .pipe()
方法來組合和鏈接多個操作符:
import { interval } from 'rxjs';
import { map } from 'rxjs/operators'; // 較舊版本的 RxJS
// 或在 RxJS 7+ 中可以直接從 rxjs 導入
// import { interval, map } from 'rxjs';
interval(1000)
.pipe(
map((val) => val * 2)
)
.subscribe({
next: (val) => console.log(val), // 輸出: 0, 2, 4, 6, 8...
});
操作符分類
RxJS 有很多操作符,大致可以分為以下幾類:
1. 創建操作符
用於創建 Observable 的操作符:
import { of, from, interval, timer, fromEvent } from 'rxjs';
// 從固定值創建 Observable
of(1, 2, 3).subscribe(val => console.log(val)); // 輸出: 1, 2, 3
// 從陣列或類陣列物件創建 Observable
from([4, 5, 6]).subscribe(val => console.log(val)); // 輸出: 4, 5, 6
// 創建定時發射的 Observable
interval(1000).subscribe(val => console.log(val)); // 每秒輸出: 0, 1, 2...
// 延遲指定時間後發射值
timer(2000, 1000).subscribe(val => console.log(val)); // 2秒後開始,之後每秒: 0, 1, 2...
// 從事件創建 Observable
const button = document.querySelector('button');
fromEvent(button, 'click').subscribe(() => console.log('按鈕被點擊了!'));
2. 轉換操作符
用於轉換 Observable 發射的值:
import { interval } from 'rxjs';
import { map, scan, buffer, concatMap, mergeMap, switchMap } from 'rxjs/operators';
// map:轉換每個值
interval(1000)
.pipe(map(val => val * 10))
.subscribe(val => console.log(val)); // 輸出: 0, 10, 20, 30...
// scan:累積值(類似 reduce,但發射中間結果)
interval(1000)
.pipe(scan((acc, val) => acc + val, 0))
.subscribe(val => console.log(val)); // 輸出: 0, 1, 3, 6, 10...
// 各種 flattening 操作符 (concatMap, mergeMap, switchMap) 用於處理高階 Observable
3. 過濾操作符
用於過濾 Observable 發射的值:
import { interval } from 'rxjs';
import { filter, take, skip, distinct, debounceTime, throttleTime } from 'rxjs/operators';
// filter:只保留符合條件的值
interval(1000)
.pipe(filter(val => val % 2 === 0))
.subscribe(val => console.log(val)); // 輸出: 0, 2, 4, 6...
// take:只取前N個值
interval(1000)
.pipe(take(5))
.subscribe(val => console.log(val)); // 輸出: 0, 1, 2, 3, 4 然後完成
// skip:跳過前N個值
interval(1000)
.pipe(skip(2))
.subscribe(val => console.log(val)); // 輸出: 2, 3, 4...(跳過前兩個)
// debounceTime:在指定時間內無新值時才發射最新值(常用於搜尋框)
fromEvent(searchInput, 'input')
.pipe(
map(event => event.target.value),
debounceTime(300)
)
.subscribe(val => console.log(`搜索: ${val}`));
4. 組合操作符
用於組合多個 Observable:
import { interval, merge, combineLatest, concat, forkJoin, zip } from 'rxjs';
import { startWith } from 'rxjs/operators';
const first$ = interval(1000).pipe(map(v => `First: ${v}`));
const second$ = interval(1500).pipe(map(v => `Second: ${v}`));
// merge:合併多個 Observable 的輸出
merge(first$, second$).subscribe(val => console.log(val));
// combineLatest:當任一源 Observable 發射值時,發射所有源的最新值
combineLatest([first$, second$])
.subscribe(([first, second]) => console.log(`組合: ${first}, ${second}`));
// startWith:在源 Observable 前添加一個初始值
interval(1000)
.pipe(startWith(-1))
.subscribe(val => console.log(val)); // 輸出: -1, 0, 1, 2...
5. 錯誤處理操作符
用於處理 Observable 中的錯誤:
import { of, throwError } from 'rxjs';
import { catchError, retry, retryWhen, timeout } from 'rxjs/operators';
// catchError:捕獲錯誤並提供恢復策略
fetchData().pipe(
catchError(error => {
console.log('發生錯誤:', error);
return of([]); // 返回空陣列作為後備
})
).subscribe(data => console.log(data));
// retry:在發生錯誤時重試
fetchData().pipe(
retry(3) // 最多重試3次
).subscribe(data => console.log(data));
實用的操作符組合
在實際應用中,我們通常會組合多個操作符來處理更複雜的邏輯:
表單輸入防抖與過濾
import { fromEvent } from 'rxjs';
import { debounceTime, distinctUntilChanged, filter, map } from 'rxjs/operators';
const searchInput = document.getElementById('search');
fromEvent(searchInput, 'input').pipe(
map((event: Event) => (event.target as HTMLInputElement).value),
filter(text => text.length > 2), // 只有超過2個字符才處理
debounceTime(300), // 等待用戶停止輸入300ms
distinctUntilChanged() // 只在值變化時發射
).subscribe(value => {
console.log(`搜索: ${value}`);
// 執行搜索邏輯...
});
HTTP請求與重試
import { HttpClient } from '@angular/common/http';
import { catchError, retry, switchMap } from 'rxjs/operators';
import { throwError, timer } from 'rxjs';
// 指數退避重試策略
function exponentialBackoffRetry(maxRetries = 3, initialDelay = 1000) {
return (errors) => {
return errors.pipe(
switchMap((error, index) => {
if (index >= maxRetries) {
return throwError(() => error);
}
const delay = initialDelay * Math.pow(2, index);
console.log(`重試 #${index + 1} 將在 ${delay}ms 後進行`);
return timer(delay);
})
);
};
}
this.http.get('api/data').pipe(
retryWhen(exponentialBackoffRetry(3, 1000)),
catchError(error => {
console.error('所有重試都失敗了:', error);
return throwError(() => new Error('無法取得資料'));
})
).subscribe({
next: data => console.log('資料:', data),
error: err => console.error('發生錯誤:', err)
});
建立自定義操作符
當內建操作符不足以滿足你的需求時,你可以創建自定義操作符:
import { Observable } from 'rxjs';
import { tap } from 'rxjs/operators';
// 創建一個簡單的記錄操作符
function debug(prefix: string) {
return <T>(source: Observable<T>) => {
return source.pipe(
tap({
next: value => console.log(`${prefix} Next:`, value),
error: error => console.log(`${prefix} Error:`, error),
complete: () => console.log(`${prefix} Complete`)
})
);
};
}
// 使用自定義操作符
interval(1000).pipe(
debug('計時器'),
map(val => val * 2),
debug('轉換後')
).subscribe();
最佳實踐
使用適當的操作符:了解每個操作符的作用,選擇最適合你需求的
避免巢狀訂閱:使用 flattening 操作符(switchMap, mergeMap等)而不是嵌套 subscribe
處理錯誤:始終處理潛在的錯誤情況
及時清理訂閱:使用 takeUntil 或 takeUntilDestroyed 等操作符管理訂閱生命週期
遵循函數式編程原則:盡量讓你的操作是純函數,避免副作用
// 推薦寫法:使用 pipe 和適當的操作符
source$.pipe(
map(data => processData(data)),
filter(result => result !== null),
catchError(error => handleError(error))
).subscribe(result => displayResult(result));
// 避免這種寫法:巢狀訂閱
source$.subscribe(data => {
const processed = processData(data);
if (processed !== null) {
displayResult(processed);
}
});
Last updated