RxJS & Observable
RxJS 與 Observable
RxJS 基本概念
RxJS (Reactive Extensions for JavaScript) 是一個處理非同步程式設計的函式庫,不是 Angular 獨有的,可以在多種 JavaScript 環境中使用。它提供了基於 Observable 模式的強大工具集,用於處理事件、非同步操作和資料流。
官方文件:https://rxjs.dev/api
Observable
Observable 是 RxJS 的核心概念,它是一個能夠隨時間產生和控制資料流的物件。
Observable 的基本概念
可以發射多個值(資料流)
可以在任何時間點發射值
可以正常完成或因錯誤而終止
Observable 的特性
多值:不像 Promise 只能解析一個值,Observable 可以隨時間發射多個值
惰性執行:Observable 在被訂閱前不會執行其邏輯
可取消:訂閱可以隨時取消,停止接收值並釋放資源
訂閱 Observable
Observable 在被訂閱 (subscribe) 之前不會執行其產生資料的邏輯。這是 RxJS 的「惰性執行」特性。
import { interval } from 'rxjs';
// 創建一個每秒發射遞增數字的 Observable
const numbers$ = interval(1000);
// 如果沒有訂閱,上面的程式碼不會產生任何效果
// Observable 只有在被訂閱時才會開始執行
Observer 物件
訂閱 Observable 時,我們提供一個 observer 物件,它可以實現最多三種方法:
next:
(value) => {}
- 每當 Observable 發射新值時調用error:
(error) => {}
- 當 Observable 發生錯誤時調用complete:
() => {}
- 當 Observable 正常完成時調用
numbers$.subscribe({
next: (value) => console.log(`接收到的值: ${value}`),
error: (error) => console.error(`發生錯誤: ${error}`),
complete: () => console.log('Observable 完成')
});
實用範例:interval
interval
是 RxJS 提供的一個用於建立按時間間隔發射值的 Observable 的函數:
import { Component, DestroyRef, OnInit, inject } from '@angular/core';
import { interval } from 'rxjs';
@Component({
selector: 'app-root',
standalone: true,
templateUrl: './app.component.html',
})
export class AppComponent implements OnInit {
private destroyRef = inject(DestroyRef);
ngOnInit(): void {
// 創建一個每秒發射一個遞增數字的 Observable
const subscription = interval(1000).subscribe({
next: (val) => console.log(val),
// interval 不會自己完成,所以這裡沒有處理 complete
// 也不會拋出錯誤,所以沒有處理 error
});
// 重要:清理訂閱以避免記憶體洩漏
this.destroyRef.onDestroy(() => {
subscription.unsubscribe();
});
}
}
訂閱管理與清理
管理訂閱並在不再需要時取消它們是非常重要的,否則可能導致記憶體洩漏。
基本的清理方式
// 將 subscribe() 返回的 Subscription 物件保存下來
const subscription = observable$.subscribe(...);
// 在組件銷毀時取消訂閱
ngOnDestroy() {
subscription.unsubscribe();
}
使用 DestroyRef (Angular 14+)
private destroyRef = inject(DestroyRef);
ngOnInit() {
const subscription = observable$.subscribe(...);
this.destroyRef.onDestroy(() => {
subscription.unsubscribe();
});
}
使用 takeUntilDestroyed (Angular 16+)
最現代的訂閱管理方式,不需要手動調用 unsubscribe:
import { takeUntilDestroyed } from '@angular/core/rxjs-interop';
constructor() {
interval(1000)
.pipe(takeUntilDestroyed())
.subscribe(val => console.log(val));
}
RxJS 操作符
RxJS 提供了豐富的操作符來處理 Observable,這些操作符可以通過 pipe()
方法串聯使用。
import { interval } from 'rxjs';
import { map, filter, take } from 'rxjs/operators';
interval(1000).pipe(
filter(value => value % 2 === 0), // 只保留偶數
map(value => value * 10), // 將每個值乘以 10
take(5) // 只取前 5 個值然後完成
).subscribe({
next: value => console.log(value),
complete: () => console.log('完成!')
});
常用操作符
map: 轉換每個發射的值
filter: 過濾符合條件的值
take: 只取指定數量的值
debounceTime: 在指定時間內無新值時才發射最新值
switchMap: 將每個源值映射到新的 Observable,並只保持最新的內部訂閱
catchError: 捕獲錯誤並提供恢復策略
實用場景
處理 HTTP 請求
import { HttpClient } from '@angular/common/http';
import { catchError, retry } from 'rxjs/operators';
import { throwError } from 'rxjs';
constructor(private http: HttpClient) {}
fetchData() {
return this.http.get<any[]>('api/data').pipe(
retry(3), // 失敗時最多重試 3 次
catchError(error => {
console.error('發生錯誤:', error);
return throwError(() => new Error('資料請求失敗'));
})
);
}
表單輸入防抖
import { fromEvent } from 'rxjs';
import { debounceTime, map } from 'rxjs/operators';
ngAfterViewInit() {
const searchInput = document.getElementById('search');
fromEvent(searchInput, 'input').pipe(
map((event: Event) => (event.target as HTMLInputElement).value),
debounceTime(300) // 等待用戶停止輸入 300ms 後再處理
).subscribe(value => {
this.search(value);
});
}
總結
RxJS 是一個強大的非同步程式設計工具,它的 Observable 模式提供了很大的靈活性。在 Angular 應用中,它被廣泛用於處理 HTTP 請求、表單輸入、事件處理等場景。正確管理訂閱和利用適當的操作符可以讓你的程式碼更加簡潔、可讀,同時避免常見的記憶體洩漏問題。
Last updated