函数式反应式编程
函数式反应式编程(Functional Reactive Programming,FRP)结合了函数式编程和反应式编程,用于处理随时间变化的数据流。
什么是 FRP?
FRP 将数据视为随时间变化的流(Stream),使用函数式编程的方式处理这些流。
// 数据流的概念
// 时间: 0---1---2---3---4---5---
// 数据: 1 2 3 4 5
RxJS 基础
RxJS 是 JavaScript 中最流行的 FRP 库。
import { Observable, from, fromEvent, interval } from 'rxjs';
import { map, filter, debounceTime, switchMap } from 'rxjs/operators';
// 创建 Observable
const numbers$ = from([1, 2, 3, 4, 5]);
// 订阅
numbers$.subscribe({
next: x => console.log(x),
error: err => console.error(err),
complete: () => console.log('Done')
});
创建 Observable
import { of, from, interval, fromEvent } from 'rxjs';
// 从值创建
of(1, 2, 3).subscribe(console.log);
// 从数组创建
from([1, 2, 3]).subscribe(console.log);
// 定时发射
interval(1000).subscribe(x => console.log(x)); // 0, 1, 2, ...
// 从事件创建
fromEvent(document, 'click').subscribe(e => console.log(e));
// 自定义创建
const custom$ = new Observable(observer => {
observer.next(1);
observer.next(2);
observer.complete();
});
操作符
转换操作符
import { map, pluck, scan } from 'rxjs/operators';
// map
of(1, 2, 3).pipe(
map(x => x * 2)
).subscribe(console.log); // 2, 4, 6
// pluck - 提取属性
of({ name: 'John', age: 25 }).pipe(
pluck('name')
).subscribe(console.log); // 'John'
// scan - 累积值
of(1, 2, 3, 4).pipe(
scan((acc, x) => acc + x, 0)
).subscribe(console.log); // 1, 3, 6, 10
过滤操作符
import { filter, take, debounceTime, distinctUntilChanged } from 'rxjs/operators';
// filter
of(1, 2, 3, 4, 5).pipe(
filter(x => x % 2 === 0)
).subscribe(console.log); // 2, 4
// take - 取前 n 个
interval(1000).pipe(
take(3)
).subscribe(console.log); // 0, 1, 2
// debounceTime
fromEvent(input, 'input').pipe(
debounceTime(300),
map(e => e.target.value)
).subscribe(console.log);
// distinctUntilChanged
of(1, 1, 2, 2, 3, 1).pipe(
distinctUntilChanged()
).subscribe(console.log); // 1, 2, 3, 1
组合操作符
import { merge, combineLatest, concat } from 'rxjs';
// merge - 合并多个流
const stream1$ = interval(1000).pipe(map(x => `A${x}`));
const stream2$ = interval(1500).pipe(map(x => `B${x}`));
merge(stream1$, stream2$).subscribe(console.log);
// combineLatest - 组合最新值
const a$ = interval(1000);
const b$ = interval(1500);
combineLatest([a$, b$]).subscribe(console.log); // [0,0], [1,0], [1,1]...
// switchMap - 切换流
fromEvent(button, 'click').pipe(
switchMap(() => fetchData())
).subscribe(console.log);
实际应用
搜索自动完成
import { fromEvent } from 'rxjs';
import { debounceTime, map, switchMap, distinctUntilChanged } from 'rxjs/operators';
const searchInput = document.getElementById('search');
fromEvent(searchInput, 'input').pipe(
map(e => e.target.value),
debounceTime(300),
distinctUntilChanged(),
switchMap(query => searchApi(query))
).subscribe(results => {
displayResults(results);
});
实时数据流
import { webSocket } from 'rxjs/webSocket';
const socket$ = webSocket('wss://api.example.com/stream');
socket$.pipe(
filter(data => data.type === 'price'),
map(data => data.value),
scan((acc, price) => [...acc.slice(-9), price], [])
).subscribe(prices => {
updateChart(prices);
});
拖拽实现
const draggable = document.getElementById('draggable');
const mouseDown$ = fromEvent(draggable, 'mousedown');
const mouseMove$ = fromEvent(document, 'mousemove');
const mouseUp$ = fromEvent(document, 'mouseup');
mouseDown$.pipe(
switchMap(() => mouseMove$.pipe(
takeUntil(mouseUp$),
map(e => ({ x: e.clientX, y: e.clientY }))
))
).subscribe(pos => {
draggable.style.left = `${pos.x}px`;
draggable.style.top = `${pos.y}px`;
});
错误处理
import { catchError, retry } from 'rxjs/operators';
fetchData().pipe(
retry(3),
catchError(error => {
console.error('Failed after 3 retries:', error);
return of(defaultData);
})
).subscribe(console.log);
最佳实践
- 及时取消订阅 避免内存泄漏
- 使用 takeUntil 管理订阅生命周期
- 合理使用操作符 避免过度复杂的管道
- 错误处理 始终考虑错误情况