跳到主要内容

函数式反应式编程

函数式反应式编程(Functional Reactive Programming,FRP)结合了函数式编程和反应式编程,用于处理随时间变化的数据流。

什么是 FRP?

FRP 将数据视为随时间变化的流(Stream),使用函数式编程的方式处理这些流。

// 数据流的概念
// 时间: 0---1---2---3---4---5---
// 数据: 1 2 3 4 5

RxJS 基础

RxJS 是 JavaScript 中最流行的 FRP 库。

import { Observable, from, fromEvent, interval } from 'rxjs';
import { map, filter, debounceTime, switchMap } from 'rxjs/operators';

// 创建 Observable
const numbers$ = from([1, 2, 3, 4, 5]);

// 订阅
numbers$.subscribe({
next: x => console.log(x),
error: err => console.error(err),
complete: () => console.log('Done')
});

创建 Observable

import { of, from, interval, fromEvent } from 'rxjs';

// 从值创建
of(1, 2, 3).subscribe(console.log);

// 从数组创建
from([1, 2, 3]).subscribe(console.log);

// 定时发射
interval(1000).subscribe(x => console.log(x)); // 0, 1, 2, ...

// 从事件创建
fromEvent(document, 'click').subscribe(e => console.log(e));

// 自定义创建
const custom$ = new Observable(observer => {
observer.next(1);
observer.next(2);
observer.complete();
});

操作符

转换操作符

import { map, pluck, scan } from 'rxjs/operators';

// map
of(1, 2, 3).pipe(
map(x => x * 2)
).subscribe(console.log); // 2, 4, 6

// pluck - 提取属性
of({ name: 'John', age: 25 }).pipe(
pluck('name')
).subscribe(console.log); // 'John'

// scan - 累积值
of(1, 2, 3, 4).pipe(
scan((acc, x) => acc + x, 0)
).subscribe(console.log); // 1, 3, 6, 10

过滤操作符

import { filter, take, debounceTime, distinctUntilChanged } from 'rxjs/operators';

// filter
of(1, 2, 3, 4, 5).pipe(
filter(x => x % 2 === 0)
).subscribe(console.log); // 2, 4

// take - 取前 n 个
interval(1000).pipe(
take(3)
).subscribe(console.log); // 0, 1, 2

// debounceTime
fromEvent(input, 'input').pipe(
debounceTime(300),
map(e => e.target.value)
).subscribe(console.log);

// distinctUntilChanged
of(1, 1, 2, 2, 3, 1).pipe(
distinctUntilChanged()
).subscribe(console.log); // 1, 2, 3, 1

组合操作符

import { merge, combineLatest, concat } from 'rxjs';

// merge - 合并多个流
const stream1$ = interval(1000).pipe(map(x => `A${x}`));
const stream2$ = interval(1500).pipe(map(x => `B${x}`));
merge(stream1$, stream2$).subscribe(console.log);

// combineLatest - 组合最新值
const a$ = interval(1000);
const b$ = interval(1500);
combineLatest([a$, b$]).subscribe(console.log); // [0,0], [1,0], [1,1]...

// switchMap - 切换流
fromEvent(button, 'click').pipe(
switchMap(() => fetchData())
).subscribe(console.log);

实际应用

搜索自动完成

import { fromEvent } from 'rxjs';
import { debounceTime, map, switchMap, distinctUntilChanged } from 'rxjs/operators';

const searchInput = document.getElementById('search');

fromEvent(searchInput, 'input').pipe(
map(e => e.target.value),
debounceTime(300),
distinctUntilChanged(),
switchMap(query => searchApi(query))
).subscribe(results => {
displayResults(results);
});

实时数据流

import { webSocket } from 'rxjs/webSocket';

const socket$ = webSocket('wss://api.example.com/stream');

socket$.pipe(
filter(data => data.type === 'price'),
map(data => data.value),
scan((acc, price) => [...acc.slice(-9), price], [])
).subscribe(prices => {
updateChart(prices);
});

拖拽实现

const draggable = document.getElementById('draggable');

const mouseDown$ = fromEvent(draggable, 'mousedown');
const mouseMove$ = fromEvent(document, 'mousemove');
const mouseUp$ = fromEvent(document, 'mouseup');

mouseDown$.pipe(
switchMap(() => mouseMove$.pipe(
takeUntil(mouseUp$),
map(e => ({ x: e.clientX, y: e.clientY }))
))
).subscribe(pos => {
draggable.style.left = `${pos.x}px`;
draggable.style.top = `${pos.y}px`;
});

错误处理

import { catchError, retry } from 'rxjs/operators';

fetchData().pipe(
retry(3),
catchError(error => {
console.error('Failed after 3 retries:', error);
return of(defaultData);
})
).subscribe(console.log);

最佳实践

  1. 及时取消订阅 避免内存泄漏
  2. 使用 takeUntil 管理订阅生命周期
  3. 合理使用操作符 避免过度复杂的管道
  4. 错误处理 始终考虑错误情况