函数式反应式编程
函数式反应式编程(Functional Reactive Programming,FRP)是一种编程范式,它结合了函数式编程和反应式编程的思想。在现代前端开发中,FRP 已经成为处理异步数据流的核心技术,Angular 内置使用 RxJS,React 生态中也广泛应用。
什么是 FRP?
FRP 将数据视为随时间变化的"流"(Stream),使用函数式编程的方式来处理这些流。理解 FRP 需要掌握两个核心概念:
反应式编程
反应式编程是一种面向数据流和变化传播的编程范式。简单来说,当数据源发生变化时,所有依赖这个数据的地方都会自动更新。
// 传统命令式编程
let a = 1;
let b = 2;
let c = a + b; // c = 3
a = 10; // a 变了
// c 还是 3,不会自动更新
// 反应式编程的概念(伪代码)
// a: 1 → 10
// b: 2
// c = a + b 自动变为 12
函数式编程
函数式编程强调纯函数、不可变性和函数组合。FRP 中的操作符都是纯函数,它们接收一个流,返回一个新流,不修改原始流。
// 函数式:通过纯函数组合处理数据流
const result = stream
.filter(x => x > 0) // 过滤:纯函数
.map(x => x * 2) // 转换:纯函数
.reduce((a, b) => a + b); // 归约:纯函数
数据流的概念
想象一条时间轴,数据沿着这条轴流动:
时间: ----t1----t2----t3----t4----t5---->
数据: 1 2 3 4 5
这是一个发出 1, 2, 3, 4, 5 的数据流
数据流可以是:
- 用户输入(点击、键盘、鼠标移动)
- 服务器响应(HTTP 请求)
- 定时器事件
- WebSocket 消息
RxJS 基础
RxJS 是 JavaScript 中最流行的 FRP 库,Angular 的 HTTP 模块和表单模块都基于 RxJS 构建。
Observable:可观察对象
Observable 是 RxJS 的核心概念,它代表一个可观察的数据流。根据 RxJS 官方文档,Observable 是"惰性的推送集合,可以包含多个值"。
理解 Observable 的关键点:
| 特性 | Function | Promise | Observable |
|---|---|---|---|
| 调用方式 | 拉取 | 推送 | 推送 |
| 返回值数量 | 单个 | 单个 | 零到多个 |
| 执行时机 | 同步 | 异步 | 同步或异步 |
| 是否惰性 | 是 | 否 | 是 |
import { Observable } from 'rxjs';
// 创建 Observable
const observable = new Observable(subscriber => {
// 同步发送 1, 2, 3
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
// 异步发送 4
setTimeout(() => {
subscriber.next(4);
subscriber.complete(); // 完成
}, 1000);
});
// 订阅 Observable
observable.subscribe({
next: value => console.log('收到值:', value),
error: err => console.error('错误:', err),
complete: () => console.log('完成')
});
// 输出:
// 收到值: 1
// 收到值: 2
// 收到值: 3
// (等待 1 秒)
// 收到值: 4
// 完成
Observer:观察者
Observer 是一个对象,定义了如何处理 Observable 发出的通知:
const observer = {
// 处理正常值
next: value => console.log('值:', value),
// 处理错误
error: err => console.error('错误:', err),
// 处理完成
complete: () => console.log('完成')
};
// 订阅时传入 observer
observable.subscribe(observer);
// 也可以只传一个函数,作为 next 处理器
observable.subscribe(value => console.log(value));
Subscription:订阅
Subscription 代表一个可观察对象的执行,主要用于取消订阅:
import { interval } from 'rxjs';
// 每秒发出一个递增数字
const numbers$ = interval(1000);
const subscription = numbers$.subscribe({
next: value => console.log(value)
});
// 5 秒后取消订阅
setTimeout(() => {
subscription.unsubscribe();
console.log('已取消订阅');
}, 5000);
// 输出: 0, 1, 2, 3, 4, 已取消订阅
为什么需要取消订阅?
如果不取消订阅,Observable 会继续执行,可能导致:
- 内存泄漏
- 不必要的网络请求
- 意外的副作用
// React 组件中取消订阅的正确方式
import { useEffect } from 'react';
import { interval } from 'rxjs';
function MyComponent() {
useEffect(() => {
const subscription = interval(1000).subscribe(
value => console.log(value)
);
// 组件卸载时取消订阅
return () => subscription.unsubscribe();
}, []);
return <div>My Component</div>;
}
创建 Observable
RxJS 提供了多种创建 Observable 的方式:
创建操作符
import { of, from, interval, timer, fromEvent, fromPromise } from 'rxjs';
// of: 从一组值创建
of(1, 2, 3).subscribe(console.log);
// 输出: 1, 2, 3
// from: 从数组、Promise、Iterable 创建
from([1, 2, 3]).subscribe(console.log);
// 输出: 1, 2, 3
from(fetch('/api/users')).subscribe(response => {
// 处理响应
});
// interval: 定时发出递增数字
interval(1000).subscribe(console.log);
// 每 1 秒: 0, 1, 2, 3, ...
// timer: 延迟后开始发出值
timer(3000, 1000).subscribe(console.log);
// 3 秒后开始,每 1 秒: 0, 1, 2, 3, ...
// fromEvent: 从 DOM 事件创建
import { fromEvent } from 'rxjs';
fromEvent(document, 'click').subscribe(event => {
console.log('点击位置:', event.clientX, event.clientY);
});
fromEvent(document, 'keydown').subscribe(event => {
console.log('按键:', event.key);
});
自定义 Observable
import { Observable } from 'rxjs';
// 创建一个每秒发出当前时间的 Observable
const clock$ = new Observable(subscriber => {
const id = setInterval(() => {
subscriber.next(new Date().toLocaleTimeString());
}, 1000);
// 返回清理函数(取消订阅时执行)
return () => {
clearInterval(id);
console.log('时钟已停止');
};
});
const subscription = clock$.subscribe(console.log);
// 每秒输出当前时间
// 5 秒后停止
setTimeout(() => subscription.unsubscribe(), 5000);
操作符
操作符是 RxJS 的精华所在。它们是纯函数,接收一个 Observable,返回一个新的 Observable。操作符可以分为以下几类:
创建操作符
import { of, from, range, generate, defer, EMPTY, NEVER, throwError } from 'rxjs';
// range: 创建范围内的数字
range(1, 5).subscribe(console.log); // 1, 2, 3, 4, 5
// generate: 创建基于条件的序列
generate(
2, // 初始值
x => x <= 10, // 条件
x => x * 2 // 迭代函数
).subscribe(console.log); // 2, 4, 8
// defer: 延迟创建,每次订阅时重新创建
const random$ = defer(() => of(Math.random()));
random$.subscribe(console.log); // 每次订阅都是新的随机数
// EMPTY: 立即完成的空 Observable
EMPTY.subscribe({
next: () => console.log('next'),
complete: () => console.log('complete')
});
// 只输出: complete
// NEVER: 永不发出值和完成的 Observable
// 常用于测试或表示"等待"状态
// throwError: 立即抛出错误
throwError(() => new Error('出错了')).subscribe({
error: err => console.error(err.message)
});
转换操作符
转换操作符用于改变数据流中的值:
import { of, from } from 'rxjs';
import { map, pluck, scan, buffer, bufferTime, mapTo } from 'rxjs/operators';
// map: 对每个值进行转换
of(1, 2, 3).pipe(
map(x => x * 2)
).subscribe(console.log); // 2, 4, 6
// pluck: 提取对象的指定属性
const users = [
{ name: '张三', age: 25 },
{ name: '李四', age: 30 }
];
from(users).pipe(
pluck('name')
).subscribe(console.log); // 张三, 李四
// scan: 类似 reduce,但发出每次累积的结果
of(1, 2, 3, 4, 5).pipe(
scan((acc, value) => acc + value, 0)
).subscribe(console.log); // 1, 3, 6, 10, 15
// bufferTime: 收集一段时间内的值
interval(100).pipe(
bufferTime(300)
).subscribe(console.log);
// [0, 1, 2], [3, 4, 5], [6, 7, 8], ...
// mapTo: 将所有值映射为同一个值
fromEvent(document, 'click').pipe(
mapTo('clicked!')
).subscribe(console.log);
// 每次点击都输出 'clicked!'
过滤操作符
过滤操作符用于筛选数据流中的值:
import { of, interval, fromEvent } from 'rxjs';
import { filter, take, takeUntil, takeWhile, skip, first, last, debounceTime, distinctUntilChanged, throttleTime } from 'rxjs/operators';
// filter: 过滤满足条件的值
of(1, 2, 3, 4, 5).pipe(
filter(x => x % 2 === 0)
).subscribe(console.log); // 2, 4
// take: 只取前 n 个值
interval(1000).pipe(
take(3)
).subscribe(console.log); // 0, 1, 2(然后完成)
// takeUntil: 直到另一个 Observable 发出值才停止
const stop$ = fromEvent(document, 'dblclick');
interval(500).pipe(
takeUntil(stop$)
).subscribe(console.log);
// 双击后停止
// takeWhile: 满足条件时继续
of(1, 2, 3, 4, 5).pipe(
takeWhile(x => x < 4)
).subscribe(console.log); // 1, 2, 3
// skip: 跳过前 n 个值
of(1, 2, 3, 4, 5).pipe(
skip(2)
).subscribe(console.log); // 3, 4, 5
// first/last: 取第一个/最后一个
of(1, 2, 3).pipe(first()).subscribe(console.log); // 1
of(1, 2, 3).pipe(last()).subscribe(console.log); // 3
// debounceTime: 防抖,等待一段时间没有新值才发出
fromEvent(input, 'input').pipe(
debounceTime(300),
map(e => e.target.value)
).subscribe(value => {
console.log('搜索:', value);
// 用户停止输入 300ms 后才触发搜索
});
// distinctUntilChanged: 只在值变化时发出
of(1, 1, 2, 2, 3, 1, 1).pipe(
distinctUntilChanged()
).subscribe(console.log); // 1, 2, 3, 1
// throttleTime: 节流,指定时间内只发出一个值
fromEvent(document, 'click').pipe(
throttleTime(1000)
).subscribe(() => console.log('点击'));
// 1 秒内最多触发一次
组合操作符
组合操作符用于合并多个 Observable:
import { of, interval, fromEvent, merge, combineLatest, concat, zip, forkJoin } from 'rxjs';
import { map, take } from 'rxjs/operators';
// merge: 同时订阅多个 Observable,值按时间顺序发出
const source1$ = interval(1000).pipe(map(x => `A${x}`));
const source2$ = interval(1500).pipe(map(x => `B${x}`));
merge(source1$, source2$).subscribe(console.log);
// A0, B0, A1, A2, B1, A3, B2, ...
// combineLatest: 组合多个 Observable 的最新值
const a$ = interval(1000).pipe(take(3));
const b$ = interval(1500).pipe(take(3));
combineLatest([a$, b$]).subscribe(console.log);
// [0, 0], [1, 0], [1, 1], [2, 1], [2, 2]
// concat: 依次执行,前一个完成后才开始下一个
const first$ = of(1, 2, 3);
const second$ = of(4, 5, 6);
concat(first$, second$).subscribe(console.log);
// 1, 2, 3, 4, 5, 6
// zip: 按索引配对
const letters$ = of('a', 'b', 'c');
const numbers$ = of(1, 2, 3);
zip(letters$, numbers$).subscribe(console.log);
// ['a', 1], ['b', 2], ['c', 3]
// forkJoin: 等待所有 Observable 完成后,取各自的最后一个值
const req1$ = fetch('/api/users').then(r => r.json());
const req2$ = fetch('/api/posts').then(r => r.json());
forkJoin([from(req1$), from(req2$)]).subscribe(([users, posts]) => {
console.log('用户:', users);
console.log('文章:', posts);
});
高阶映射操作符
这是 RxJS 中最强大的操作符类别,用于处理"Observable 的 Observable":
import { of, fromEvent, interval } from 'rxjs';
import { map, mergeMap, switchMap, concatMap, exhaustMap, take } from 'rxjs/operators';
// 模拟 API 请求
const fetchUser = id =>
new Promise(resolve => setTimeout(() => resolve({ id, name: `用户${id}` }), 1000));
// mergeMap (flatMap): 并行处理所有内部 Observable
fromEvent(button, 'click').pipe(
mergeMap(() => from(fetchUser(1)))
).subscribe(user => console.log(user));
// 每次点击都会发起请求,请求并行执行
// switchMap: 新值到来时,取消之前的内部 Observable
// 最适合搜索框、路由切换等场景
fromEvent(searchInput, 'input').pipe(
debounceTime(300),
map(e => e.target.value),
switchMap(query => from(searchAPI(query)))
).subscribe(results => {
// 只显示最新请求的结果
displayResults(results);
});
// concatMap: 按顺序处理,前一个完成后再处理下一个
fromEvent(button, 'click').pipe(
concatMap(() => from(fetchUser(1)))
).subscribe(user => console.log(user));
// 请求按点击顺序依次执行
// exhaustMap: 忽略正在执行期间的新的 Observable
// 最适合防止重复提交
fromEvent(submitButton, 'click').pipe(
exhaustMap(() => from(submitForm()))
).subscribe(response => {
// 在请求完成前,新的点击会被忽略
console.log('提交成功:', response);
});
// 对比表格
/*
| 操作符 | 行为 | 适用场景 |
|--------|------|----------|
| mergeMap | 并行处理 | 多个请求同时执行 |
| switchMap | 取消前一个 | 搜索、路由切换 |
| concatMap | 顺序执行 | 有顺序要求的操作 |
| exhaustMap | 忽略新请求 | 防止重复提交 |
*/
错误处理操作符
import { of, interval, throwError } from 'rxjs';
import { catchError, retry, retryWhen, delay, take, map } from 'rxjs/operators';
// catchError: 捕获错误并返回备选值或 Observable
const failing$ = throwError(() => new Error('网络错误'));
failing$.pipe(
catchError(error => {
console.error('捕获错误:', error.message);
return of('默认值'); // 返回备选值
})
).subscribe(console.log); // 默认值
// retry: 重试指定次数
fromFetch('/api/data').pipe(
retry(3), // 失败后重试 3 次
catchError(() => of(null))
).subscribe(console.log);
// retryWhen: 自定义重试逻辑
interval(1000).pipe(
map(x => {
if (x > 2) throw new Error('模拟错误');
return x;
}),
retryWhen(errors =>
errors.pipe(
delay(1000), // 延迟 1 秒后重试
take(3) // 最多重试 3 次
)
)
).subscribe(console.log);
实际应用场景
搜索自动完成
这是一个经典的 FRP 应用场景:
import { fromEvent } from 'rxjs';
import { debounceTime, distinctUntilChanged, map, switchMap, filter, catchError, of } from 'rxjs/operators';
const searchInput = document.getElementById('search');
fromEvent(searchInput, 'input').pipe(
// 获取输入值
map(e => e.target.value),
// 过滤空值
filter(query => query.trim().length > 0),
// 防抖:停止输入 300ms 后才执行
debounceTime(300),
// 去重:相同的查询不重复请求
distinctUntilChanged(),
// 切换到最新请求,取消之前的请求
switchMap(query =>
from(searchAPI(query)).pipe(
// 错误处理
catchError(error => {
console.error('搜索失败:', error);
return of([]); // 返回空数组
})
)
)
).subscribe(results => {
displayResults(results);
});
// 搜索 API 封装
async function searchAPI(query) {
const response = await fetch(`/api/search?q=${encodeURIComponent(query)}`);
return response.json();
}
拖拽实现
使用 FRP 实现拖拽更加优雅:
import { fromEvent, merge } from 'rxjs';
import { map, takeUntil, switchMap, filter } from 'rxjs/operators';
const draggable = document.getElementById('draggable');
// 鼠标按下
const mouseDown$ = fromEvent(draggable, 'mousedown');
// 鼠标移动
const mouseMove$ = fromEvent(document, 'mousemove');
// 鼠标松开
const mouseUp$ = fromEvent(document, 'mouseup');
// 拖拽流:按下 -> 移动 -> 松开
mouseDown$.pipe(
// 获取初始位置
map(event => ({
startX: event.clientX,
startY: event.clientY,
startLeft: draggable.offsetLeft,
startTop: draggable.offsetTop
})),
// 切换到移动流
switchMap(({ startX, startY, startLeft, startTop }) =>
mouseMove$.pipe(
// 计算新位置
map(moveEvent => ({
left: startLeft + moveEvent.clientX - startX,
top: startTop + moveEvent.clientY - startY
})),
// 松开时结束
takeUntil(mouseUp$)
)
)
).subscribe(({ left, top }) => {
draggable.style.left = `${left}px`;
draggable.style.top = `${top}px`;
});
实时数据流
处理 WebSocket 数据流:
import { webSocket } from 'rxjs/webSocket';
import { filter, map, scan, retryWhen, delay } from 'rxjs/operators';
// 创建 WebSocket 连接
const socket$ = webSocket({
url: 'wss://api.example.com/realtime',
openObserver: {
next: () => console.log('WebSocket 已连接')
},
closeObserver: {
next: () => console.log('WebSocket 已断开')
}
});
// 订阅股票价格流
socket$.pipe(
// 过滤只关注的价格更新
filter(data => data.type === 'price_update'),
// 提取价格
map(data => ({
symbol: data.symbol,
price: data.price,
timestamp: new Date(data.timestamp)
})),
// 缓存最近 10 条记录
scan((history, update) => [...history.slice(-9), update], []),
// 连接断开时自动重连
retryWhen(errors =>
errors.pipe(
delay(3000) // 3 秒后重连
)
)
).subscribe(prices => {
updatePriceChart(prices);
});
// 发送订阅消息
socket$.next({
action: 'subscribe',
channels: ['AAPL', 'GOOGL', 'MSFT']
});
复杂表单验证
import { fromEvent, combineLatest } from 'rxjs';
import { map, debounceTime, filter, distinctUntilChanged, startWith } from 'rxjs/operators';
// 获取表单元素
const usernameInput = document.getElementById('username');
const emailInput = document.getElementById('email');
const passwordInput = document.getElementById('password');
// 创建输入流
const username$ = fromEvent(usernameInput, 'input').pipe(
map(e => e.target.value),
debounceTime(300),
distinctUntilChanged()
);
const email$ = fromEvent(emailInput, 'input').pipe(
map(e => e.target.value),
debounceTime(300),
distinctUntilChanged()
);
const password$ = fromEvent(passwordInput, 'input').pipe(
map(e => e.target.value),
debounceTime(300),
distinctUntilChanged()
);
// 验证函数
const validateUsername = username =>
username.length >= 3 ? { valid: true } : { valid: false, error: '用户名至少 3 个字符' };
const validateEmail = email =>
/^[^\s@]+@[^\s@]+\.[^\s@]+$/.test(email)
? { valid: true }
: { valid: false, error: '邮箱格式不正确' };
const validatePassword = password =>
password.length >= 8
? { valid: true }
: { valid: false, error: '密码至少 8 个字符' };
// 组合验证
combineLatest([
username$.pipe(map(validateUsername), startWith({ valid: false })),
email$.pipe(map(validateEmail), startWith({ valid: false })),
password$.pipe(map(validatePassword), startWith({ valid: false }))
]).pipe(
map(([username, email, password]) => ({
username,
email,
password,
allValid: username.valid && email.valid && password.valid
}))
).subscribe(validation => {
// 更新 UI
updateValidationUI(validation);
// 启用/禁用提交按钮
submitButton.disabled = !validation.allValid;
});
Subject:多播
Subject 是一种特殊的 Observable,它可以向多个观察者多播值:
import { Subject, BehaviorSubject, ReplaySubject, AsyncSubject } from 'rxjs';
// Subject: 基本的多播
const subject = new Subject();
subject.subscribe({
next: v => console.log('观察者A:', v)
});
subject.subscribe({
next: v => console.log('观察者B:', v)
});
subject.next(1); // 观察者A: 1, 观察者B: 1
subject.next(2); // 观察者A: 2, 观察者B: 2
// BehaviorSubject: 记住当前值,新订阅者立即获得当前值
const behaviorSubject = new BehaviorSubject('初始值');
behaviorSubject.subscribe(v => console.log('A:', v)); // A: 初始值
behaviorSubject.next('新值');
behaviorSubject.subscribe(v => console.log('B:', v)); // B: 新值
// ReplaySubject: 记住指定数量的历史值
const replaySubject = new ReplaySubject(2); // 记住最近 2 个值
replaySubject.next(1);
replaySubject.next(2);
replaySubject.next(3);
replaySubject.subscribe(v => console.log('新订阅者:', v));
// 新订阅者: 2
// 新订阅者: 3
// AsyncSubject: 只在完成时发出最后一个值
const asyncSubject = new AsyncSubject();
asyncSubject.subscribe(v => console.log('结果:', v));
asyncSubject.next(1);
asyncSubject.next(2);
asyncSubject.next(3);
asyncSubject.complete(); // 结果: 3
Subject 的实际应用
import { Subject } from 'rxjs';
import { filter, map } from 'rxjs/operators';
// 创建一个简单的状态管理器
function createStore(initialState) {
const state$ = new BehaviorSubject(initialState);
return {
// 获取当前状态
getState: () => state$.value,
// 更新状态
setState: updater => {
const newState = typeof updater === 'function'
? updater(state$.value)
: updater;
state$.next(newState);
},
// 订阅状态变化
subscribe: callback => state$.subscribe(callback),
// 选择特定状态片段
select: selector => state$.pipe(
map(selector),
distinctUntilChanged()
)
};
}
// 使用
const store = createStore({ user: null, theme: 'light' });
// 订阅主题变化
store.select(state => state.theme).subscribe(theme => {
document.body.className = theme;
});
// 更新状态
store.setState(state => ({ ...state, theme: 'dark' }));
最佳实践
1. 及时取消订阅
import { Subject } from 'rxjs';
import { takeUntil } from 'rxjs/operators';
// Angular 组件中的最佳实践
@Component({...})
export class MyComponent implements OnDestroy {
private destroy$ = new Subject<void>();
ngOnInit() {
this.data$.pipe(
takeUntil(this.destroy$) // 组件销毁时自动取消
).subscribe(data => {
// 处理数据
});
}
ngOnDestroy() {
this.destroy$.next();
this.destroy$.complete();
}
}
2. 避免在 subscribe 中处理复杂逻辑
// 不好:在 subscribe 中处理太多
source$.subscribe(value => {
const processed = complexProcessing(value);
const validated = validate(processed);
if (validated) {
save(validated);
showNotification('保存成功');
}
});
// 好:使用操作符链
source$.pipe(
map(value => complexProcessing(value)),
filter(processed => validate(processed)),
switchMap(validated => save(validated)),
tap(() => showNotification('保存成功'))
).subscribe();
3. 合理使用 Subject 和 Observable
// 使用 Observable(单向数据流)
// 适合:从源到消费者的单向数据流
const data$ = fromEvent(source, 'data');
// 使用 Subject(多播、可观察和可订阅)
// 适合:需要手动推送值,多个订阅者
const eventBus = new Subject();
eventBus.next({ type: 'user-login', data: user });
4. 使用 take 系列操作符控制流的结束
// 使用 take(n) 限制发出值的数量
interval(1000).pipe(
take(5) // 只取前 5 个
).subscribe();
// 使用 takeUntil 在特定条件下结束
interval(1000).pipe(
takeUntil(logout$) // 登出时结束
).subscribe();
// 使用 first() 只取第一个满足条件的值
fromEvent(document, 'click').pipe(
first(event => event.target.matches('.confirm-button'))
).subscribe(() => {
// 处理确认按钮的第一次点击
});
小结
函数式反应式编程是一种强大的编程范式:
- 核心概念:Observable(可观察对象)、Observer(观察者)、Subscription(订阅)
- 操作符:创建、转换、过滤、组合、错误处理等
- Subject:多播、BehaviorSubject、ReplaySubject
- 实际应用:搜索、拖拽、WebSocket、表单验证
掌握 RxJS 需要理解几个关键点:
- Observable 是惰性的,只有订阅后才会执行
- 每次订阅都会创建新的执行
- 记得取消订阅,避免内存泄漏
- 使用操作符组合和转换数据流
- 选择正确的映射操作符(mergeMap、switchMap、concatMap、exhaustMap)
FRP 的学习曲线可能较陡,但一旦掌握,你会发现处理异步和事件驱动的代码变得优雅而简洁。