RxJS & Observable

RxJS 與 Observable

RxJS 基本概念

RxJS (Reactive Extensions for JavaScript) 是一個處理非同步程式設計的函式庫,不是 Angular 獨有的,可以在多種 JavaScript 環境中使用。它提供了基於 Observable 模式的強大工具集,用於處理事件、非同步操作和資料流。

官方文件:https://rxjs.dev/api

Observable

Observable 是 RxJS 的核心概念,它是一個能夠隨時間產生和控制資料流的物件。

Observable 的基本概念

  • 可以發射多個值(資料流)

  • 可以在任何時間點發射值

  • 可以正常完成或因錯誤而終止

Observable 的特性

  • 多值:不像 Promise 只能解析一個值,Observable 可以隨時間發射多個值

  • 惰性執行:Observable 在被訂閱前不會執行其邏輯

  • 可取消:訂閱可以隨時取消,停止接收值並釋放資源

訂閱 Observable

Observable 在被訂閱 (subscribe) 之前不會執行其產生資料的邏輯。這是 RxJS 的「惰性執行」特性。

import { interval } from 'rxjs';

// 創建一個每秒發射遞增數字的 Observable
const numbers$ = interval(1000);

// 如果沒有訂閱,上面的程式碼不會產生任何效果
// Observable 只有在被訂閱時才會開始執行

Observer 物件

訂閱 Observable 時,我們提供一個 observer 物件,它可以實現最多三種方法:

  1. next: (value) => {} - 每當 Observable 發射新值時調用

  2. error: (error) => {} - 當 Observable 發生錯誤時調用

  3. complete: () => {} - 當 Observable 正常完成時調用

numbers$.subscribe({
  next: (value) => console.log(`接收到的值: ${value}`),
  error: (error) => console.error(`發生錯誤: ${error}`),
  complete: () => console.log('Observable 完成')
});

實用範例:interval

interval 是 RxJS 提供的一個用於建立按時間間隔發射值的 Observable 的函數:

import { Component, DestroyRef, OnInit, inject } from '@angular/core';
import { interval } from 'rxjs';

@Component({
  selector: 'app-root',
  standalone: true,
  templateUrl: './app.component.html',
})
export class AppComponent implements OnInit {
  private destroyRef = inject(DestroyRef);
  
  ngOnInit(): void {
    // 創建一個每秒發射一個遞增數字的 Observable
    const subscription = interval(1000).subscribe({
      next: (val) => console.log(val),
      // interval 不會自己完成,所以這裡沒有處理 complete
      // 也不會拋出錯誤,所以沒有處理 error
    });
    
    // 重要:清理訂閱以避免記憶體洩漏
    this.destroyRef.onDestroy(() => {
      subscription.unsubscribe();
    });
  }
}

訂閱管理與清理

管理訂閱並在不再需要時取消它們是非常重要的,否則可能導致記憶體洩漏。

基本的清理方式

// 將 subscribe() 返回的 Subscription 物件保存下來
const subscription = observable$.subscribe(...);

// 在組件銷毀時取消訂閱
ngOnDestroy() {
  subscription.unsubscribe();
}

使用 DestroyRef (Angular 14+)

private destroyRef = inject(DestroyRef);

ngOnInit() {
  const subscription = observable$.subscribe(...);
  
  this.destroyRef.onDestroy(() => {
    subscription.unsubscribe();
  });
}

使用 takeUntilDestroyed (Angular 16+)

最現代的訂閱管理方式,不需要手動調用 unsubscribe:

import { takeUntilDestroyed } from '@angular/core/rxjs-interop';

constructor() {
  interval(1000)
    .pipe(takeUntilDestroyed())
    .subscribe(val => console.log(val));
}

RxJS 操作符

RxJS 提供了豐富的操作符來處理 Observable,這些操作符可以通過 pipe() 方法串聯使用。

import { interval } from 'rxjs';
import { map, filter, take } from 'rxjs/operators';

interval(1000).pipe(
  filter(value => value % 2 === 0), // 只保留偶數
  map(value => value * 10),         // 將每個值乘以 10
  take(5)                          // 只取前 5 個值然後完成
).subscribe({
  next: value => console.log(value),
  complete: () => console.log('完成!')
});

常用操作符

  • map: 轉換每個發射的值

  • filter: 過濾符合條件的值

  • take: 只取指定數量的值

  • debounceTime: 在指定時間內無新值時才發射最新值

  • switchMap: 將每個源值映射到新的 Observable,並只保持最新的內部訂閱

  • catchError: 捕獲錯誤並提供恢復策略

實用場景

處理 HTTP 請求

import { HttpClient } from '@angular/common/http';
import { catchError, retry } from 'rxjs/operators';
import { throwError } from 'rxjs';

constructor(private http: HttpClient) {}

fetchData() {
  return this.http.get<any[]>('api/data').pipe(
    retry(3), // 失敗時最多重試 3 次
    catchError(error => {
      console.error('發生錯誤:', error);
      return throwError(() => new Error('資料請求失敗'));
    })
  );
}

表單輸入防抖

import { fromEvent } from 'rxjs';
import { debounceTime, map } from 'rxjs/operators';

ngAfterViewInit() {
  const searchInput = document.getElementById('search');
  
  fromEvent(searchInput, 'input').pipe(
    map((event: Event) => (event.target as HTMLInputElement).value),
    debounceTime(300) // 等待用戶停止輸入 300ms 後再處理
  ).subscribe(value => {
    this.search(value);
  });
}

總結

RxJS 是一個強大的非同步程式設計工具,它的 Observable 模式提供了很大的靈活性。在 Angular 應用中,它被廣泛用於處理 HTTP 請求、表單輸入、事件處理等場景。正確管理訂閱和利用適當的操作符可以讓你的程式碼更加簡潔、可讀,同時避免常見的記憶體洩漏問題。

Last updated